// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once /*! \file rx-timestamp.hpp \brief Returns an observable that attaches a timestamp to each item emitted by the source observable indicating when it was emitted. \tparam Coordination the type of the scheduler (optional). \param coordination the scheduler to manage timeout for each event (optional). \return Observable that emits a pair: { item emitted by the source observable, time_point representing the current value of the clock }. \sample \snippet timestamp.cpp timestamp sample \snippet output.txt timestamp sample */ #if !defined(RXCPP_OPERATORS_RX_TIMESTAMP_HPP) #define RXCPP_OPERATORS_RX_TIMESTAMP_HPP #include "../rx-includes.hpp" namespace rxcpp { namespace operators { namespace detail { template struct timestamp_invalid_arguments {}; template struct timestamp_invalid : public rxo::operator_base> { using type = observable, timestamp_invalid>; }; template using timestamp_invalid_t = typename timestamp_invalid::type; template struct timestamp { typedef rxu::decay_t source_value_type; typedef rxu::decay_t coordination_type; struct timestamp_values { timestamp_values(coordination_type c) : coordination(c) { } coordination_type coordination; }; timestamp_values initial; timestamp(coordination_type coordination) : initial(coordination) { } template struct timestamp_observer { typedef timestamp_observer this_type; typedef source_value_type value_type; typedef rxu::decay_t dest_type; typedef observer observer_type; dest_type dest; coordination_type coord; timestamp_observer(dest_type d, coordination_type coordination) : dest(std::move(d)), coord(std::move(coordination)) { } void on_next(source_value_type v) const { dest.on_next(std::make_pair(v, coord.now())); } void on_error(rxu::error_ptr e) const { dest.on_error(e); } void on_completed() const { dest.on_completed(); } static subscriber make(dest_type d, timestamp_values v) { return make_subscriber(d, this_type(d, v.coordination)); } }; template auto operator()(Subscriber dest) const -> decltype(timestamp_observer::make(std::move(dest), initial)) { return timestamp_observer::make(std::move(dest), initial); } }; } /*! @copydoc rx-timestamp.hpp */ template auto timestamp(AN&&... an) -> operator_factory { return operator_factory(std::make_tuple(std::forward(an)...)); } } template<> struct member_overload { template>, class SourceValue = rxu::value_type_t, class Timestamp = rxo::detail::timestamp, class Clock = typename rxsc::scheduler::clock_type::time_point, class Value = std::pair> static auto member(Observable&& o) -> decltype(o.template lift(Timestamp(identity_current_thread()))) { return o.template lift(Timestamp(identity_current_thread())); } template, is_coordination>, class SourceValue = rxu::value_type_t, class Timestamp = rxo::detail::timestamp>, class Clock = typename rxsc::scheduler::clock_type::time_point, class Value = std::pair> static auto member(Observable&& o, Coordination&& cn) -> decltype(o.template lift(Timestamp(std::forward(cn)))) { return o.template lift(Timestamp(std::forward(cn))); } template static operators::detail::timestamp_invalid_t member(AN...) { std::terminate(); return {}; static_assert(sizeof...(AN) == 10000, "timestamp takes (optional Coordination)"); } }; } #endif