// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once /*! \file rx-multicast.hpp \brief allows connections to the source to be independent of subscriptions. \tparam Subject the subject to multicast the source Observable. \param sub the subject. */ #if !defined(RXCPP_OPERATORS_RX_MULTICAST_HPP) #define RXCPP_OPERATORS_RX_MULTICAST_HPP #include "../rx-includes.hpp" namespace rxcpp { namespace operators { namespace detail { template struct multicast_invalid_arguments {}; template struct multicast_invalid : public rxo::operator_base> { using type = observable, multicast_invalid>; }; template using multicast_invalid_t = typename multicast_invalid::type; template struct multicast : public operator_base { typedef rxu::decay_t source_type; typedef rxu::decay_t subject_type; struct multicast_state : public std::enable_shared_from_this { multicast_state(source_type o, subject_type sub) : source(std::move(o)) , subject_value(std::move(sub)) { } source_type source; subject_type subject_value; rxu::detail::maybe connection; }; std::shared_ptr state; multicast(source_type o, subject_type sub) : state(std::make_shared(std::move(o), std::move(sub))) { } template void on_subscribe(Subscriber&& o) const { state->subject_value.get_observable().subscribe(std::forward(o)); } void on_connect(composite_subscription cs) const { if (state->connection.empty()) { auto destination = state->subject_value.get_subscriber(); // the lifetime of each connect is nested in the subject lifetime state->connection.reset(destination.add(cs)); auto localState = state; // when the connection is finished it should shutdown the connection cs.add( [destination, localState](){ if (!localState->connection.empty()) { destination.remove(localState->connection.get()); localState->connection.reset(); } }); // use cs not destination for lifetime of subscribe. state->source.subscribe(cs, destination); } } }; } /*! @copydoc rx-multicast.hpp */ template auto multicast(AN&&... an) -> operator_factory { return operator_factory(std::make_tuple(std::forward(an)...)); } } template<> struct member_overload { template>, class SourceValue = rxu::value_type_t, class Multicast = rxo::detail::multicast, rxu::decay_t>, class Value = rxu::value_type_t, class Result = connectable_observable> static Result member(Observable&& o, Subject&& sub) { return Result(Multicast(std::forward(o), std::forward(sub))); } template static operators::detail::multicast_invalid_t member(AN...) { std::terminate(); return {}; static_assert(sizeof...(AN) == 10000, "multicast takes (Subject)"); } }; } #endif