1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-multicast.hpp
6
7 \brief allows connections to the source to be independent of subscriptions.
8
9 \tparam Subject the subject to multicast the source Observable.
10
11 \param sub the subject.
12 */
13
14 #if !defined(RXCPP_OPERATORS_RX_MULTICAST_HPP)
15 #define RXCPP_OPERATORS_RX_MULTICAST_HPP
16
17 #include "../rx-includes.hpp"
18
19 namespace rxcpp {
20
21 namespace operators {
22
23 namespace detail {
24
25 template<class... AN>
26 struct multicast_invalid_arguments {};
27
28 template<class... AN>
29 struct multicast_invalid : public rxo::operator_base<multicast_invalid_arguments<AN...>> {
30 using type = observable<multicast_invalid_arguments<AN...>, multicast_invalid<AN...>>;
31 };
32 template<class... AN>
33 using multicast_invalid_t = typename multicast_invalid<AN...>::type;
34
35 template<class T, class Observable, class Subject>
36 struct multicast : public operator_base<T>
37 {
38 typedef rxu::decay_t<Observable> source_type;
39 typedef rxu::decay_t<Subject> subject_type;
40
41 struct multicast_state : public std::enable_shared_from_this<multicast_state>
42 {
multicast_staterxcpp::operators::detail::multicast::multicast_state43 multicast_state(source_type o, subject_type sub)
44 : source(std::move(o))
45 , subject_value(std::move(sub))
46 {
47 }
48 source_type source;
49 subject_type subject_value;
50 rxu::detail::maybe<typename composite_subscription::weak_subscription> connection;
51 };
52
53 std::shared_ptr<multicast_state> state;
54
multicastrxcpp::operators::detail::multicast55 multicast(source_type o, subject_type sub)
56 : state(std::make_shared<multicast_state>(std::move(o), std::move(sub)))
57 {
58 }
59 template<class Subscriber>
on_subscriberxcpp::operators::detail::multicast60 void on_subscribe(Subscriber&& o) const {
61 state->subject_value.get_observable().subscribe(std::forward<Subscriber>(o));
62 }
on_connectrxcpp::operators::detail::multicast63 void on_connect(composite_subscription cs) const {
64 if (state->connection.empty()) {
65 auto destination = state->subject_value.get_subscriber();
66
67 // the lifetime of each connect is nested in the subject lifetime
68 state->connection.reset(destination.add(cs));
69
70 auto localState = state;
71
72 // when the connection is finished it should shutdown the connection
73 cs.add(
74 [destination, localState](){
75 if (!localState->connection.empty()) {
76 destination.remove(localState->connection.get());
77 localState->connection.reset();
78 }
79 });
80
81 // use cs not destination for lifetime of subscribe.
82 state->source.subscribe(cs, destination);
83 }
84 }
85 };
86
87 }
88
89 /*! @copydoc rx-multicast.hpp
90 */
91 template<class... AN>
multicast(AN &&...an)92 auto multicast(AN&&... an)
93 -> operator_factory<multicast_tag, AN...> {
94 return operator_factory<multicast_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
95 }
96
97 }
98
99 template<>
100 struct member_overload<multicast_tag>
101 {
102 template<class Observable, class Subject,
103 class Enabled = rxu::enable_if_all_true_type_t<
104 is_observable<Observable>>,
105 class SourceValue = rxu::value_type_t<Observable>,
106 class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Subject>>,
107 class Value = rxu::value_type_t<Multicast>,
108 class Result = connectable_observable<Value, Multicast>>
memberrxcpp::member_overload109 static Result member(Observable&& o, Subject&& sub) {
110 return Result(Multicast(std::forward<Observable>(o), std::forward<Subject>(sub)));
111 }
112
113 template<class... AN>
memberrxcpp::member_overload114 static operators::detail::multicast_invalid_t<AN...> member(AN...) {
115 std::terminate();
116 return {};
117 static_assert(sizeof...(AN) == 10000, "multicast takes (Subject)");
118 }
119 };
120
121 }
122
123 #endif
124