1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_SOURCES_RX_TIMER_HPP)
6 #define RXCPP_SOURCES_RX_TIMER_HPP
7
8 #include "../rx-includes.hpp"
9
10 /*! \file rx-timer.hpp
11
12 \brief Returns an observable that emits an integer at the specified time point.
13
14 \tparam Coordination the type of the scheduler (optional)
15
16 \param when time point when the value is emitted
17 \param cn the scheduler to use for scheduling the items (optional)
18
19 \return Observable that emits an integer at the specified time point
20
21 \sample
22 \snippet timer.cpp timepoint timer sample
23 \snippet output.txt timepoint timer sample
24
25 \sample
26 \snippet timer.cpp duration timer sample
27 \snippet output.txt duration timer sample
28
29 \sample
30 \snippet timer.cpp threaded timepoint timer sample
31 \snippet output.txt threaded timepoint timer sample
32
33 \sample
34 \snippet timer.cpp threaded duration timer sample
35 \snippet output.txt threaded duration timer sample
36 */
37
38 namespace rxcpp {
39
40 namespace sources {
41
42 namespace detail {
43
44 template<class Coordination>
45 struct timer : public source_base<long>
46 {
47 typedef timer<Coordination> this_type;
48
49 typedef rxu::decay_t<Coordination> coordination_type;
50 typedef typename coordination_type::coordinator_type coordinator_type;
51
52 struct timer_initial_type
53 {
timer_initial_typerxcpp::sources::detail::timer::timer_initial_type54 timer_initial_type(rxsc::scheduler::clock_type::time_point t, coordination_type cn)
55 : when(t)
56 , coordination(std::move(cn))
57 {
58 }
59 rxsc::scheduler::clock_type::time_point when;
60 coordination_type coordination;
61 };
62 timer_initial_type initial;
63
timerrxcpp::sources::detail::timer64 timer(rxsc::scheduler::clock_type::time_point t, coordination_type cn)
65 : initial(t, std::move(cn))
66 {
67 }
timerrxcpp::sources::detail::timer68 timer(rxsc::scheduler::clock_type::duration p, coordination_type cn)
69 : initial(rxsc::scheduler::clock_type::time_point(), std::move(cn))
70 {
71 initial.when = initial.coordination.now() + p;
72 }
73 template<class Subscriber>
on_subscriberxcpp::sources::detail::timer74 void on_subscribe(Subscriber o) const {
75 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
76
77 // creates a worker whose lifetime is the same as this subscription
78 auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
79 auto controller = coordinator.get_worker();
80
81 auto producer = [o](const rxsc::schedulable&) {
82 // send the value and complete
83 o.on_next(1L);
84 o.on_completed();
85 };
86
87 auto selectedProducer = on_exception(
88 [&](){return coordinator.act(producer);},
89 o);
90 if (selectedProducer.empty()) {
91 return;
92 }
93
94 controller.schedule(initial.when, selectedProducer.get());
95 }
96 };
97
98 template<class TimePointOrDuration, class Coordination>
99 struct defer_timer : public defer_observable<
100 rxu::all_true<
101 std::is_convertible<TimePointOrDuration, rxsc::scheduler::clock_type::time_point>::value ||
102 std::is_convertible<TimePointOrDuration, rxsc::scheduler::clock_type::duration>::value,
103 is_coordination<Coordination>::value>,
104 void,
105 timer, Coordination>
106 {
107 };
108
109 }
110
111 /*! @copydoc rx-timer.hpp
112 */
113 template<class TimePointOrDuration>
timer(TimePointOrDuration when)114 auto timer(TimePointOrDuration when)
115 -> typename std::enable_if<
116 detail::defer_timer<TimePointOrDuration, identity_one_worker>::value,
117 typename detail::defer_timer<TimePointOrDuration, identity_one_worker>::observable_type>::type {
118 return detail::defer_timer<TimePointOrDuration, identity_one_worker>::make(when, identity_current_thread());
119 }
120
121 /*! @copydoc rx-timer.hpp
122 */
123 template<class TimePointOrDuration, class Coordination>
timer(TimePointOrDuration when,Coordination cn)124 auto timer(TimePointOrDuration when, Coordination cn)
125 -> typename std::enable_if<
126 detail::defer_timer<TimePointOrDuration, Coordination>::value,
127 typename detail::defer_timer<TimePointOrDuration, Coordination>::observable_type>::type {
128 return detail::defer_timer<TimePointOrDuration, Coordination>::make(when, std::move(cn));
129 }
130
131 }
132
133 }
134
135 #endif
136