• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-sample_time.hpp
6 
7     \brief Return an Observable that emits the most recent items emitted by the source Observable within periodic time intervals.
8 
9     \tparam Duration      the type of time interval.
10     \tparam Coordination  the type of the scheduler (optional).
11 
12     \param period  the period of time to sample the source observable.
13     \param coordination  the scheduler for the items (optional).
14 
15     \return  Observable that emits the most recently emitted item since the previous sampling.
16 
17     \sample
18     \snippet sample.cpp sample period sample
19     \snippet output.txt sample period sample
20 */
21 
22 #if !defined(RXCPP_OPERATORS_RX_SAMPLE_WITH_TIME_HPP)
23 #define RXCPP_OPERATORS_RX_SAMPLE_WITH_TIME_HPP
24 
25 #include "../rx-includes.hpp"
26 
27 namespace rxcpp {
28 
29 namespace operators {
30 
31 namespace detail {
32 
33 template<class... AN>
34 struct sample_with_time_invalid_arguments {};
35 
36 template<class... AN>
37 struct sample_with_time_invalid : public rxo::operator_base<sample_with_time_invalid_arguments<AN...>> {
38     using type = observable<sample_with_time_invalid_arguments<AN...>, sample_with_time_invalid<AN...>>;
39 };
40 template<class... AN>
41 using sample_with_time_invalid_t = typename sample_with_time_invalid<AN...>::type;
42 
43 template<class T, class Duration, class Coordination>
44 struct sample_with_time
45 {
46     typedef rxu::decay_t<T> source_value_type;
47     typedef rxu::decay_t<Coordination> coordination_type;
48     typedef typename coordination_type::coordinator_type coordinator_type;
49     typedef rxu::decay_t<Duration> duration_type;
50 
51     struct sample_with_time_value
52     {
sample_with_time_valuerxcpp::operators::detail::sample_with_time::sample_with_time_value53         sample_with_time_value(duration_type p, coordination_type c)
54             : period(p)
55             , coordination(c)
56         {
57         }
58         duration_type period;
59         coordination_type coordination;
60     };
61     sample_with_time_value initial;
62 
sample_with_timerxcpp::operators::detail::sample_with_time63     sample_with_time(duration_type period, coordination_type coordination)
64         : initial(period, coordination)
65     {
66     }
67 
68     template<class Subscriber>
69     struct sample_with_time_observer
70     {
71         typedef sample_with_time_observer<Subscriber> this_type;
72         typedef T value_type;
73         typedef rxu::decay_t<Subscriber> dest_type;
74         typedef observer<value_type, this_type> observer_type;
75 
76         struct sample_with_time_subscriber_value : public sample_with_time_value
77         {
sample_with_time_subscriber_valuerxcpp::operators::detail::sample_with_time::sample_with_time_observer::sample_with_time_subscriber_value78             sample_with_time_subscriber_value(composite_subscription cs, dest_type d, sample_with_time_value v, coordinator_type c)
79                 : sample_with_time_value(v)
80                 , cs(std::move(cs))
81                 , dest(std::move(d))
82                 , coordinator(std::move(c))
83                 , worker(coordinator.get_worker())
84             {
85             }
86             composite_subscription cs;
87             dest_type dest;
88             coordinator_type coordinator;
89             rxsc::worker worker;
90             mutable rxu::maybe<value_type> value;
91         };
92         std::shared_ptr<sample_with_time_subscriber_value> state;
93 
sample_with_time_observerrxcpp::operators::detail::sample_with_time::sample_with_time_observer94         sample_with_time_observer(composite_subscription cs, dest_type d, sample_with_time_value v, coordinator_type c)
95             : state(std::make_shared<sample_with_time_subscriber_value>(sample_with_time_subscriber_value(std::move(cs), std::move(d), v, std::move(c))))
96         {
97             auto localState = state;
98 
99             auto disposer = [=](const rxsc::schedulable&){
100                 localState->cs.unsubscribe();
101                 localState->dest.unsubscribe();
102                 localState->worker.unsubscribe();
103             };
104             auto selectedDisposer = on_exception(
105                 [&](){ return localState->coordinator.act(disposer); },
106                 localState->dest);
107             if (selectedDisposer.empty()) {
108                 return;
109             }
110 
111             localState->dest.add([=](){
112                 localState->worker.schedule(selectedDisposer.get());
113             });
114             localState->cs.add([=](){
115                 localState->worker.schedule(selectedDisposer.get());
116             });
117 
118             auto produce_sample = [localState](const rxsc::schedulable&) {
119                 if(!localState->value.empty()) {
120                     localState->dest.on_next(*localState->value);
121                     localState->value.reset();
122                 }
123             };
124             auto selectedProduce = on_exception(
125                 [&](){ return localState->coordinator.act(produce_sample); },
126                 localState->dest);
127             if (selectedProduce.empty()) {
128                 return;
129             }
130 
131             state->worker.schedule_periodically(
132                 localState->worker.now(),
133                 localState->period,
134                 [localState, selectedProduce](const rxsc::schedulable&) {
135                     localState->worker.schedule(selectedProduce.get());
136                 });
137         }
138 
on_nextrxcpp::operators::detail::sample_with_time::sample_with_time_observer139         void on_next(T v) const {
140             auto localState = state;
141             auto work = [v, localState](const rxsc::schedulable&) {
142                 localState->value.reset(v);
143             };
144             auto selectedWork = on_exception(
145                 [&](){ return localState->coordinator.act(work); },
146                 localState->dest);
147             if (selectedWork.empty()) {
148                 return;
149             }
150             localState->worker.schedule(selectedWork.get());
151         }
152 
on_errorrxcpp::operators::detail::sample_with_time::sample_with_time_observer153         void on_error(rxu::error_ptr e) const {
154             auto localState = state;
155             auto work = [e, localState](const rxsc::schedulable&) {
156                 localState->dest.on_error(e);
157             };
158             auto selectedWork = on_exception(
159                 [&](){ return localState->coordinator.act(work); },
160                 localState->dest);
161             if (selectedWork.empty()) {
162                 return;
163             }
164             localState->worker.schedule(selectedWork.get());
165         }
166 
on_completedrxcpp::operators::detail::sample_with_time::sample_with_time_observer167         void on_completed() const {
168             auto localState = state;
169             auto work = [localState](const rxsc::schedulable&) {
170                 localState->dest.on_completed();
171             };
172             auto selectedWork = on_exception(
173                 [&](){ return localState->coordinator.act(work); },
174                 localState->dest);
175             if (selectedWork.empty()) {
176                 return;
177             }
178             localState->worker.schedule(selectedWork.get());
179         }
180 
makerxcpp::operators::detail::sample_with_time::sample_with_time_observer181         static subscriber<T, observer<T, this_type>> make(dest_type d, sample_with_time_value v) {
182             auto cs = composite_subscription();
183             auto coordinator = v.coordination.create_coordinator();
184 
185             return make_subscriber<T>(cs, this_type(cs, std::move(d), std::move(v), std::move(coordinator)));
186         }
187     };
188 
189     template<class Subscriber>
operator ()rxcpp::operators::detail::sample_with_time190     auto operator()(Subscriber dest) const
191         -> decltype(sample_with_time_observer<Subscriber>::make(std::move(dest), initial)) {
192         return      sample_with_time_observer<Subscriber>::make(std::move(dest), initial);
193     }
194 };
195 
196 }
197 
198 /*! @copydoc rx-sample_time.hpp
199 */
200 template<class... AN>
sample_with_time(AN &&...an)201 auto sample_with_time(AN&&... an)
202     ->      operator_factory<sample_with_time_tag, AN...> {
203      return operator_factory<sample_with_time_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
204 }
205 
206 }
207 
208 template<>
209 struct member_overload<sample_with_time_tag>
210 {
211     template<class Observable, class Duration,
212         class Enabled = rxu::enable_if_all_true_type_t<
213             is_observable<Observable>,
214             rxu::is_duration<Duration>>,
215         class SourceValue = rxu::value_type_t<Observable>,
216         class SampleWithTime = rxo::detail::sample_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>>
memberrxcpp::member_overload217     static auto member(Observable&& o, Duration&& d)
218         -> decltype(o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), identity_current_thread()))) {
219         return      o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), identity_current_thread()));
220     }
221 
222     template<class Observable, class Coordination, class Duration,
223         class Enabled = rxu::enable_if_all_true_type_t<
224             is_observable<Observable>,
225             is_coordination<Coordination>,
226             rxu::is_duration<Duration>>,
227         class SourceValue = rxu::value_type_t<Observable>,
228         class SampleWithTime = rxo::detail::sample_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
memberrxcpp::member_overload229     static auto member(Observable&& o, Coordination&& cn, Duration&& d)
230         -> decltype(o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
231         return      o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), std::forward<Coordination>(cn)));
232     }
233 
234     template<class Observable, class Coordination, class Duration,
235         class Enabled = rxu::enable_if_all_true_type_t<
236             is_observable<Observable>,
237             is_coordination<Coordination>,
238             rxu::is_duration<Duration>>,
239         class SourceValue = rxu::value_type_t<Observable>,
240         class SampleWithTime = rxo::detail::sample_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
memberrxcpp::member_overload241     static auto member(Observable&& o, Duration&& d, Coordination&& cn)
242         -> decltype(o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
243         return      o.template lift<SourceValue>(SampleWithTime(std::forward<Duration>(d), std::forward<Coordination>(cn)));
244     }
245 
246     template<class... AN>
memberrxcpp::member_overload247     static operators::detail::sample_with_time_invalid_t<AN...> member(const AN&...) {
248         std::terminate();
249         return {};
250         static_assert(sizeof...(AN) == 10000, "sample_with_time takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
251     }
252 };
253 
254 }
255 
256 #endif
257