// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once /*! \file rx-timeout.hpp \brief Return an observable that terminates with timeout_error if a particular timespan has passed without emitting another item from the source observable. \tparam Duration the type of time interval. \tparam Coordination the type of the scheduler (optional). \param period the period of time wait for another item from the source observable. \param coordination the scheduler to manage timeout for each event (optional). \return Observable that terminates with an error if a particular timespan has passed without emitting another item from the source observable. \sample \snippet timeout.cpp timeout sample \snippet output.txt timeout sample */ #if !defined(RXCPP_OPERATORS_RX_TIMEOUT_HPP) #define RXCPP_OPERATORS_RX_TIMEOUT_HPP #include "../rx-includes.hpp" namespace rxcpp { class timeout_error: public std::runtime_error { public: explicit timeout_error(const std::string& msg): std::runtime_error(msg) {} }; namespace operators { namespace detail { template struct timeout_invalid_arguments {}; template struct timeout_invalid : public rxo::operator_base> { using type = observable, timeout_invalid>; }; template using timeout_invalid_t = typename timeout_invalid::type; template struct timeout { typedef rxu::decay_t source_value_type; typedef rxu::decay_t coordination_type; typedef typename coordination_type::coordinator_type coordinator_type; typedef rxu::decay_t duration_type; struct timeout_values { timeout_values(duration_type p, coordination_type c) : period(p) , coordination(c) { } duration_type period; coordination_type coordination; }; timeout_values initial; timeout(duration_type period, coordination_type coordination) : initial(period, coordination) { } template struct timeout_observer { typedef timeout_observer this_type; typedef rxu::decay_t value_type; typedef rxu::decay_t dest_type; typedef observer observer_type; struct timeout_subscriber_values : public timeout_values { timeout_subscriber_values(composite_subscription cs, dest_type d, timeout_values v, coordinator_type c) : timeout_values(v) , cs(std::move(cs)) , dest(std::move(d)) , coordinator(std::move(c)) , worker(coordinator.get_worker()) , index(0) { } composite_subscription cs; dest_type dest; coordinator_type coordinator; rxsc::worker worker; mutable std::size_t index; }; typedef std::shared_ptr state_type; state_type state; timeout_observer(composite_subscription cs, dest_type d, timeout_values v, coordinator_type c) : state(std::make_shared(timeout_subscriber_values(std::move(cs), std::move(d), v, std::move(c)))) { auto localState = state; auto disposer = [=](const rxsc::schedulable&){ localState->cs.unsubscribe(); localState->dest.unsubscribe(); localState->worker.unsubscribe(); }; auto selectedDisposer = on_exception( [&](){ return localState->coordinator.act(disposer); }, localState->dest); if (selectedDisposer.empty()) { return; } localState->dest.add([=](){ localState->worker.schedule(selectedDisposer.get()); }); localState->cs.add([=](){ localState->worker.schedule(selectedDisposer.get()); }); auto work = [v, localState](const rxsc::schedulable&) { auto new_id = ++localState->index; auto produce_time = localState->worker.now() + localState->period; localState->worker.schedule(produce_time, produce_timeout(new_id, localState)); }; auto selectedWork = on_exception( [&](){return localState->coordinator.act(work);}, localState->dest); if (selectedWork.empty()) { return; } localState->worker.schedule(selectedWork.get()); } static std::function produce_timeout(std::size_t id, state_type state) { auto produce = [id, state](const rxsc::schedulable&) { if(id != state->index) return; state->dest.on_error(rxu::make_error_ptr(rxcpp::timeout_error("timeout has occurred"))); }; auto selectedProduce = on_exception( [&](){ return state->coordinator.act(produce); }, state->dest); if (selectedProduce.empty()) { return std::function(); } return std::function(selectedProduce.get()); } void on_next(T v) const { auto localState = state; auto work = [v, localState](const rxsc::schedulable&) { auto new_id = ++localState->index; auto produce_time = localState->worker.now() + localState->period; localState->dest.on_next(v); localState->worker.schedule(produce_time, produce_timeout(new_id, localState)); }; auto selectedWork = on_exception( [&](){return localState->coordinator.act(work);}, localState->dest); if (selectedWork.empty()) { return; } localState->worker.schedule(selectedWork.get()); } void on_error(rxu::error_ptr e) const { auto localState = state; auto work = [e, localState](const rxsc::schedulable&) { localState->dest.on_error(e); }; auto selectedWork = on_exception( [&](){ return localState->coordinator.act(work); }, localState->dest); if (selectedWork.empty()) { return; } localState->worker.schedule(selectedWork.get()); } void on_completed() const { auto localState = state; auto work = [localState](const rxsc::schedulable&) { localState->dest.on_completed(); }; auto selectedWork = on_exception( [&](){ return localState->coordinator.act(work); }, localState->dest); if (selectedWork.empty()) { return; } localState->worker.schedule(selectedWork.get()); } static subscriber make(dest_type d, timeout_values v) { auto cs = composite_subscription(); auto coordinator = v.coordination.create_coordinator(); return make_subscriber(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator)))); } }; template auto operator()(Subscriber dest) const -> decltype(timeout_observer::make(std::move(dest), initial)) { return timeout_observer::make(std::move(dest), initial); } }; } /*! @copydoc rx-timeout.hpp */ template auto timeout(AN&&... an) -> operator_factory { return operator_factory(std::make_tuple(std::forward(an)...)); } } template<> struct member_overload { template, rxu::is_duration>, class SourceValue = rxu::value_type_t, class Timeout = rxo::detail::timeout, identity_one_worker>> static auto member(Observable&& o, Duration&& d) -> decltype(o.template lift(Timeout(std::forward(d), identity_current_thread()))) { return o.template lift(Timeout(std::forward(d), identity_current_thread())); } template, is_coordination, rxu::is_duration>, class SourceValue = rxu::value_type_t, class Timeout = rxo::detail::timeout, rxu::decay_t>> static auto member(Observable&& o, Coordination&& cn, Duration&& d) -> decltype(o.template lift(Timeout(std::forward(d), std::forward(cn)))) { return o.template lift(Timeout(std::forward(d), std::forward(cn))); } template, is_coordination, rxu::is_duration>, class SourceValue = rxu::value_type_t, class Timeout = rxo::detail::timeout, rxu::decay_t>> static auto member(Observable&& o, Duration&& d, Coordination&& cn) -> decltype(o.template lift(Timeout(std::forward(d), std::forward(cn)))) { return o.template lift(Timeout(std::forward(d), std::forward(cn))); } template static operators::detail::timeout_invalid_t member(const AN&...) { std::terminate(); return {}; static_assert(sizeof...(AN) == 10000, "timeout takes (optional Coordination, required Duration) or (required Duration, optional Coordination)"); } }; } #endif