// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once #if !defined(RXCPP_SOURCES_RX_TIMER_HPP) #define RXCPP_SOURCES_RX_TIMER_HPP #include "../rx-includes.hpp" /*! \file rx-timer.hpp \brief Returns an observable that emits an integer at the specified time point. \tparam Coordination the type of the scheduler (optional) \param when time point when the value is emitted \param cn the scheduler to use for scheduling the items (optional) \return Observable that emits an integer at the specified time point \sample \snippet timer.cpp timepoint timer sample \snippet output.txt timepoint timer sample \sample \snippet timer.cpp duration timer sample \snippet output.txt duration timer sample \sample \snippet timer.cpp threaded timepoint timer sample \snippet output.txt threaded timepoint timer sample \sample \snippet timer.cpp threaded duration timer sample \snippet output.txt threaded duration timer sample */ namespace rxcpp { namespace sources { namespace detail { template struct timer : public source_base { typedef timer this_type; typedef rxu::decay_t coordination_type; typedef typename coordination_type::coordinator_type coordinator_type; struct timer_initial_type { timer_initial_type(rxsc::scheduler::clock_type::time_point t, coordination_type cn) : when(t) , coordination(std::move(cn)) { } rxsc::scheduler::clock_type::time_point when; coordination_type coordination; }; timer_initial_type initial; timer(rxsc::scheduler::clock_type::time_point t, coordination_type cn) : initial(t, std::move(cn)) { } timer(rxsc::scheduler::clock_type::duration p, coordination_type cn) : initial(rxsc::scheduler::clock_type::time_point(), std::move(cn)) { initial.when = initial.coordination.now() + p; } template void on_subscribe(Subscriber o) const { static_assert(is_subscriber::value, "subscribe must be passed a subscriber"); // creates a worker whose lifetime is the same as this subscription auto coordinator = initial.coordination.create_coordinator(o.get_subscription()); auto controller = coordinator.get_worker(); auto producer = [o](const rxsc::schedulable&) { // send the value and complete o.on_next(1L); o.on_completed(); }; auto selectedProducer = on_exception( [&](){return coordinator.act(producer);}, o); if (selectedProducer.empty()) { return; } controller.schedule(initial.when, selectedProducer.get()); } }; template struct defer_timer : public defer_observable< rxu::all_true< std::is_convertible::value || std::is_convertible::value, is_coordination::value>, void, timer, Coordination> { }; } /*! @copydoc rx-timer.hpp */ template auto timer(TimePointOrDuration when) -> typename std::enable_if< detail::defer_timer::value, typename detail::defer_timer::observable_type>::type { return detail::defer_timer::make(when, identity_current_thread()); } /*! @copydoc rx-timer.hpp */ template auto timer(TimePointOrDuration when, Coordination cn) -> typename std::enable_if< detail::defer_timer::value, typename detail::defer_timer::observable_type>::type { return detail::defer_timer::make(when, std::move(cn)); } } } #endif