// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once #if !defined(RXCPP_SOURCES_RX_RANGE_HPP) #define RXCPP_SOURCES_RX_RANGE_HPP #include "../rx-includes.hpp" /*! \file rx-range.hpp \brief Returns an observable that sends values in the range ```first```-```last``` by adding ```step``` to the previous value. The values are sent on the specified scheduler. \tparam T the type of the values that this observable emits \tparam Coordination the type of the scheduler (optional) \param first first value to send (optional) \param last last value to send (optional) \param step value to add to the previous value to get the next value (optional) \param cn the scheduler to run the generator loop on (optional) \return Observable that sends values in the range ```first```-```last``` by adding ```step``` to the previous value using the specified scheduler. \sample \snippet range.cpp threaded range sample \snippet output.txt threaded range sample An alternative way to specify the scheduler for emitted values is to use observable::subscribe_on operator \snippet range.cpp subscribe_on range sample \snippet output.txt subscribe_on range sample */ namespace rxcpp { namespace sources { namespace detail { template struct range : public source_base { typedef rxu::decay_t coordination_type; typedef typename coordination_type::coordinator_type coordinator_type; struct range_state_type { range_state_type(T f, T l, std::ptrdiff_t s, coordination_type cn) : next(f) , last(l) , step(s) , coordination(std::move(cn)) { } mutable T next; T last; std::ptrdiff_t step; coordination_type coordination; }; range_state_type initial; range(T f, T l, std::ptrdiff_t s, coordination_type cn) : initial(f, l, s, std::move(cn)) { } template void on_subscribe(Subscriber o) const { static_assert(is_subscriber::value, "subscribe must be passed a subscriber"); // creates a worker whose lifetime is the same as this subscription auto coordinator = initial.coordination.create_coordinator(o.get_subscription()); auto controller = coordinator.get_worker(); auto state = initial; auto producer = [=](const rxsc::schedulable& self){ auto& dest = o; if (!dest.is_subscribed()) { // terminate loop return; } // send next value dest.on_next(state.next); if (!dest.is_subscribed()) { // terminate loop return; } if (std::max(state.last, state.next) - std::min(state.last, state.next) < std::abs(state.step)) { if (state.last != state.next) { dest.on_next(state.last); } dest.on_completed(); // o is unsubscribed return; } state.next = static_cast(state.step + state.next); // tail recurse this same action to continue loop self(); }; auto selectedProducer = on_exception( [&](){return coordinator.act(producer);}, o); if (selectedProducer.empty()) { return; } controller.schedule(selectedProducer.get()); } }; } /*! @copydoc rx-create.hpp */ template auto range(T first = 0, T last = std::numeric_limits::max(), std::ptrdiff_t step = 1) -> observable> { return observable>( detail::range(first, last, step, identity_current_thread())); } /*! @copydoc rx-create.hpp */ template auto range(T first, T last, std::ptrdiff_t step, Coordination cn) -> observable> { return observable>( detail::range(first, last, step, std::move(cn))); } /*! @copydoc rx-create.hpp */ template auto range(T first, T last, Coordination cn) -> typename std::enable_if::value, observable>>::type { return observable>( detail::range(first, last, 1, std::move(cn))); } /*! @copydoc rx-create.hpp */ template auto range(T first, Coordination cn) -> typename std::enable_if::value, observable>>::type { return observable>( detail::range(first, std::numeric_limits::max(), 1, std::move(cn))); } } } #endif