// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once #if !defined(RXCPP_RX_BEHAVIOR_HPP) #define RXCPP_RX_BEHAVIOR_HPP #include "../rx-includes.hpp" namespace rxcpp { namespace subjects { namespace detail { template class behavior_observer : public detail::multicast_observer { typedef behavior_observer this_type; typedef detail::multicast_observer base_type; class behavior_observer_state : public std::enable_shared_from_this { mutable std::mutex lock; mutable T value; public: behavior_observer_state(T first) : value(first) { } void reset(T v) const { std::unique_lock guard(lock); value = std::move(v); } T get() const { std::unique_lock guard(lock); return value; } }; std::shared_ptr state; public: behavior_observer(T f, composite_subscription l) : base_type(l) , state(std::make_shared(std::move(f))) { } subscriber get_subscriber() const { return make_subscriber(this->get_id(), this->get_subscription(), observer>(*this)).as_dynamic(); } T get_value() const { return state->get(); } template void on_next(V v) const { state->reset(v); base_type::on_next(std::move(v)); } }; } template class behavior { detail::behavior_observer s; public: explicit behavior(T f, composite_subscription cs = composite_subscription()) : s(std::move(f), cs) { } bool has_observers() const { return s.has_observers(); } T get_value() const { return s.get_value(); } subscriber get_subscriber() const { return s.get_subscriber(); } observable get_observable() const { auto keepAlive = s; return make_observable_dynamic([=](subscriber o){ if (keepAlive.get_subscription().is_subscribed()) { o.on_next(get_value()); } keepAlive.add(s.get_subscriber(), std::move(o)); }); } }; } } #endif