1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-time_interval.hpp
6
7 \brief Returns an observable that emits indications of the amount of time lapsed between consecutive emissions of the source observable.
8 The first emission from this new Observable indicates the amount of time lapsed between the time when the observer subscribed to the Observable and the time when the source Observable emitted its first item.
9
10 \tparam Coordination the type of the scheduler.
11
12 \param coordination the scheduler for time intervals.
13
14 \return Observable that emits a time_duration to indicate the amount of time lapsed between pairs of emissions.
15
16 \sample
17 \snippet time_interval.cpp time_interval sample
18 \snippet output.txt time_interval sample
19 */
20
21 #if !defined(RXCPP_OPERATORS_RX_TIME_INTERVAL_HPP)
22 #define RXCPP_OPERATORS_RX_TIME_INTERVAL_HPP
23
24 #include "../rx-includes.hpp"
25
26 namespace rxcpp {
27
28 namespace operators {
29
30 namespace detail {
31
32 template<class... AN>
33 struct time_interval_invalid_arguments {};
34
35 template<class... AN>
36 struct time_interval_invalid : public rxo::operator_base<time_interval_invalid_arguments<AN...>> {
37 using type = observable<time_interval_invalid_arguments<AN...>, time_interval_invalid<AN...>>;
38 };
39 template<class... AN>
40 using time_interval_invalid_t = typename time_interval_invalid<AN...>::type;
41
42 template<class T, class Coordination>
43 struct time_interval
44 {
45 typedef rxu::decay_t<T> source_value_type;
46 typedef rxu::decay_t<Coordination> coordination_type;
47
48 struct time_interval_values {
time_interval_valuesrxcpp::operators::detail::time_interval::time_interval_values49 time_interval_values(coordination_type c)
50 : coordination(c)
51 {
52 }
53
54 coordination_type coordination;
55 };
56 time_interval_values initial;
57
time_intervalrxcpp::operators::detail::time_interval58 time_interval(coordination_type coordination)
59 : initial(coordination)
60 {
61 }
62
63 template<class Subscriber>
64 struct time_interval_observer
65 {
66 typedef time_interval_observer<Subscriber> this_type;
67 typedef source_value_type value_type;
68 typedef rxu::decay_t<Subscriber> dest_type;
69 typedef observer<value_type, this_type> observer_type;
70 typedef rxsc::scheduler::clock_type::time_point time_point;
71 dest_type dest;
72 coordination_type coord;
73 mutable time_point last;
74
time_interval_observerrxcpp::operators::detail::time_interval::time_interval_observer75 time_interval_observer(dest_type d, coordination_type coordination)
76 : dest(std::move(d)),
77 coord(std::move(coordination)),
78 last(coord.now())
79 {
80 }
81
on_nextrxcpp::operators::detail::time_interval::time_interval_observer82 void on_next(source_value_type) const {
83 time_point now = coord.now();
84 dest.on_next(now - last);
85 last = now;
86 }
on_errorrxcpp::operators::detail::time_interval::time_interval_observer87 void on_error(rxu::error_ptr e) const {
88 dest.on_error(e);
89 }
on_completedrxcpp::operators::detail::time_interval::time_interval_observer90 void on_completed() const {
91 dest.on_completed();
92 }
93
makerxcpp::operators::detail::time_interval::time_interval_observer94 static subscriber<value_type, observer_type> make(dest_type d, time_interval_values v) {
95 return make_subscriber<value_type>(d, this_type(d, v.coordination));
96 }
97 };
98
99 template<class Subscriber>
operator ()rxcpp::operators::detail::time_interval100 auto operator()(Subscriber dest) const
101 -> decltype(time_interval_observer<Subscriber>::make(std::move(dest), initial)) {
102 return time_interval_observer<Subscriber>::make(std::move(dest), initial);
103 }
104 };
105
106 }
107
108 /*! @copydoc rx-time_interval.hpp
109 */
110 template<class... AN>
time_interval(AN &&...an)111 auto time_interval(AN&&... an)
112 -> operator_factory<time_interval_tag, AN...> {
113 return operator_factory<time_interval_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
114 }
115
116 }
117
118 template<>
119 struct member_overload<time_interval_tag>
120 {
121 template<class Observable,
122 class Enabled = rxu::enable_if_all_true_type_t<
123 is_observable<Observable>>,
124 class SourceValue = rxu::value_type_t<Observable>,
125 class TimeInterval = rxo::detail::time_interval<SourceValue, identity_one_worker>,
126 class Value = typename rxsc::scheduler::clock_type::time_point::duration>
memberrxcpp::member_overload127 static auto member(Observable&& o)
128 -> decltype(o.template lift<Value>(TimeInterval(identity_current_thread()))) {
129 return o.template lift<Value>(TimeInterval(identity_current_thread()));
130 }
131
132 template<class Observable, class Coordination,
133 class Enabled = rxu::enable_if_all_true_type_t<
134 is_observable<Observable>,
135 is_coordination<Coordination>>,
136 class SourceValue = rxu::value_type_t<Observable>,
137 class TimeInterval = rxo::detail::time_interval<SourceValue, rxu::decay_t<Coordination>>,
138 class Value = typename rxsc::scheduler::clock_type::time_point::duration>
memberrxcpp::member_overload139 static auto member(Observable&& o, Coordination&& cn)
140 -> decltype(o.template lift<Value>(TimeInterval(std::forward<Coordination>(cn)))) {
141 return o.template lift<Value>(TimeInterval(std::forward<Coordination>(cn)));
142 }
143
144 template<class... AN>
memberrxcpp::member_overload145 static operators::detail::time_interval_invalid_t<AN...> member(AN...) {
146 std::terminate();
147 return {};
148 static_assert(sizeof...(AN) == 10000, "time_interval takes (optional Coordination)");
149 }
150 };
151
152 }
153
154 #endif
155