• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_SOURCES_RX_TIMER_HPP)
6 #define RXCPP_SOURCES_RX_TIMER_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 /*! \file rx-timer.hpp
11 
12     \brief Returns an observable that emits an integer at the specified time point.
13 
14     \tparam Coordination  the type of the scheduler (optional)
15 
16     \param  when  time point when the value is emitted
17     \param  cn    the scheduler to use for scheduling the items (optional)
18 
19     \return  Observable that emits an integer at the specified time point
20 
21     \sample
22     \snippet timer.cpp timepoint timer sample
23     \snippet output.txt timepoint timer sample
24 
25     \sample
26     \snippet timer.cpp duration timer sample
27     \snippet output.txt duration timer sample
28 
29     \sample
30     \snippet timer.cpp threaded timepoint timer sample
31     \snippet output.txt threaded timepoint timer sample
32 
33     \sample
34     \snippet timer.cpp threaded duration timer sample
35     \snippet output.txt threaded duration timer sample
36 */
37 
38 namespace rxcpp {
39 
40 namespace sources {
41 
42 namespace detail {
43 
44 template<class Coordination>
45 struct timer : public source_base<long>
46 {
47     typedef timer<Coordination> this_type;
48 
49     typedef rxu::decay_t<Coordination> coordination_type;
50     typedef typename coordination_type::coordinator_type coordinator_type;
51 
52     struct timer_initial_type
53     {
timer_initial_typerxcpp::sources::detail::timer::timer_initial_type54         timer_initial_type(rxsc::scheduler::clock_type::time_point t, coordination_type cn)
55             : when(t)
56             , coordination(std::move(cn))
57         {
58         }
59         rxsc::scheduler::clock_type::time_point when;
60         coordination_type coordination;
61     };
62     timer_initial_type initial;
63 
timerrxcpp::sources::detail::timer64     timer(rxsc::scheduler::clock_type::time_point t, coordination_type cn)
65         : initial(t, std::move(cn))
66     {
67     }
timerrxcpp::sources::detail::timer68     timer(rxsc::scheduler::clock_type::duration p, coordination_type cn)
69         : initial(rxsc::scheduler::clock_type::time_point(), std::move(cn))
70     {
71         initial.when = initial.coordination.now() + p;
72     }
73     template<class Subscriber>
on_subscriberxcpp::sources::detail::timer74     void on_subscribe(Subscriber o) const {
75         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
76 
77         // creates a worker whose lifetime is the same as this subscription
78         auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
79         auto controller = coordinator.get_worker();
80 
81         auto producer = [o](const rxsc::schedulable&) {
82             // send the value and complete
83             o.on_next(1L);
84             o.on_completed();
85         };
86 
87         auto selectedProducer = on_exception(
88             [&](){return coordinator.act(producer);},
89             o);
90         if (selectedProducer.empty()) {
91             return;
92         }
93 
94         controller.schedule(initial.when, selectedProducer.get());
95     }
96 };
97 
98 template<class TimePointOrDuration, class Coordination>
99 struct defer_timer : public defer_observable<
100     rxu::all_true<
101         std::is_convertible<TimePointOrDuration, rxsc::scheduler::clock_type::time_point>::value ||
102         std::is_convertible<TimePointOrDuration, rxsc::scheduler::clock_type::duration>::value,
103         is_coordination<Coordination>::value>,
104     void,
105     timer, Coordination>
106 {
107 };
108 
109 }
110 
111 /*! @copydoc rx-timer.hpp
112  */
113 template<class TimePointOrDuration>
timer(TimePointOrDuration when)114 auto timer(TimePointOrDuration when)
115     ->  typename std::enable_if<
116                     detail::defer_timer<TimePointOrDuration, identity_one_worker>::value,
117         typename    detail::defer_timer<TimePointOrDuration, identity_one_worker>::observable_type>::type {
118     return          detail::defer_timer<TimePointOrDuration, identity_one_worker>::make(when, identity_current_thread());
119 }
120 
121 /*! @copydoc rx-timer.hpp
122  */
123 template<class TimePointOrDuration, class Coordination>
timer(TimePointOrDuration when,Coordination cn)124 auto timer(TimePointOrDuration when, Coordination cn)
125     ->  typename std::enable_if<
126                     detail::defer_timer<TimePointOrDuration, Coordination>::value,
127         typename    detail::defer_timer<TimePointOrDuration, Coordination>::observable_type>::type {
128     return          detail::defer_timer<TimePointOrDuration, Coordination>::make(when, std::move(cn));
129 }
130 
131 }
132 
133 }
134 
135 #endif
136