1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-publish.hpp
6
7 \brief Turn a cold observable hot and allow connections to the source to be independent of subscriptions.
8 Turn a cold observable hot, send the most recent value to any new subscriber, and allow connections to the source to be independent of subscriptions.
9
10 \tparam T the type of the emitted item (optional).
11
12 \param first an initial item to be emitted by the resulting observable at connection time before emitting the items from the source observable; not emitted to observers that subscribe after the time of connection (optional).
13 \param cs the subscription to control lifetime (optional).
14
15 \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers.
16
17 \sample
18 \snippet publish.cpp publish subject sample
19 \snippet output.txt publish subject sample
20
21 \sample
22 \snippet publish.cpp publish behavior sample
23 \snippet output.txt publish behavior sample
24
25 \sample
26 \snippet publish.cpp publish diamond samethread sample
27 \snippet output.txt publish diamond samethread sample
28
29 \sample
30 \snippet publish.cpp publish diamond bgthread sample
31 \snippet output.txt publish diamond bgthread sample
32
33 \sample
34 \snippet ref_count.cpp ref_count other diamond sample
35 \snippet output.txt ref_count other diamond sample
36 */
37
38 #if !defined(RXCPP_OPERATORS_RX_PUBLISH_HPP)
39 #define RXCPP_OPERATORS_RX_PUBLISH_HPP
40
41 #include "../rx-includes.hpp"
42 #include "./rx-multicast.hpp"
43
44 namespace rxcpp {
45
46 namespace operators {
47
48 namespace detail {
49
50 template<class... AN>
51 struct publish_invalid_arguments {};
52
53 template<class... AN>
54 struct publish_invalid : public rxo::operator_base<publish_invalid_arguments<AN...>> {
55 using type = observable<publish_invalid_arguments<AN...>, publish_invalid<AN...>>;
56 };
57 template<class... AN>
58 using publish_invalid_t = typename publish_invalid<AN...>::type;
59
60 }
61
62 /*! @copydoc rx-publish.hpp
63 */
64 template<class... AN>
publish(AN &&...an)65 auto publish(AN&&... an)
66 -> operator_factory<publish_tag, AN...> {
67 return operator_factory<publish_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
68 }
69
70 /*! \brief Turn a cold observable hot and allow connections to the source to be independent of subscriptions.
71
72 \tparam Coordination the type of the scheduler.
73
74 \param cn a scheduler all values are queued and delivered on.
75 \param cs the subscription to control lifetime (optional).
76
77 \return rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers, on the specified scheduler.
78
79 \sample
80 \snippet publish.cpp publish_synchronized sample
81 \snippet output.txt publish_synchronized sample
82 */
83 template<class... AN>
publish_synchronized(AN &&...an)84 auto publish_synchronized(AN&&... an)
85 -> operator_factory<publish_synchronized_tag, AN...> {
86 return operator_factory<publish_synchronized_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
87 }
88
89 }
90
91 template<>
92 struct member_overload<publish_tag>
93 {
94 template<class Observable,
95 class Enabled = rxu::enable_if_all_true_type_t<
96 is_observable<Observable>>,
97 class SourceValue = rxu::value_type_t<Observable>,
98 class Subject = rxsub::subject<SourceValue>,
99 class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
100 class Result = connectable_observable<SourceValue, Multicast>
101 >
memberrxcpp::member_overload102 static Result member(Observable&& o) {
103 return Result(Multicast(std::forward<Observable>(o), Subject(composite_subscription())));
104 }
105
106 template<class Observable,
107 class Enabled = rxu::enable_if_all_true_type_t<
108 is_observable<Observable>>,
109 class SourceValue = rxu::value_type_t<Observable>,
110 class Subject = rxsub::subject<SourceValue>,
111 class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
112 class Result = connectable_observable<SourceValue, Multicast>
113 >
memberrxcpp::member_overload114 static Result member(Observable&& o, composite_subscription cs) {
115 return Result(Multicast(std::forward<Observable>(o), Subject(cs)));
116 }
117
118 template<class Observable, class T,
119 class Enabled = rxu::enable_if_all_true_type_t<
120 is_observable<Observable>>,
121 class SourceValue = rxu::value_type_t<Observable>,
122 class Subject = rxsub::behavior<SourceValue>,
123 class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
124 class Result = connectable_observable<SourceValue, Multicast>
125 >
memberrxcpp::member_overload126 static Result member(Observable&& o, T first, composite_subscription cs = composite_subscription()) {
127 return Result(Multicast(std::forward<Observable>(o), Subject(first, cs)));
128 }
129
130 template<class... AN>
memberrxcpp::member_overload131 static operators::detail::publish_invalid_t<AN...> member(AN...) {
132 std::terminate();
133 return {};
134 static_assert(sizeof...(AN) == 10000, "publish takes (optional CompositeSubscription) or (T, optional CompositeSubscription)");
135 }
136 };
137
138 template<>
139 struct member_overload<publish_synchronized_tag>
140 {
141 template<class Observable, class Coordination,
142 class Enabled = rxu::enable_if_all_true_type_t<
143 is_observable<Observable>,
144 is_coordination<Coordination>>,
145 class SourceValue = rxu::value_type_t<Observable>,
146 class Subject = rxsub::synchronize<SourceValue, rxu::decay_t<Coordination>>,
147 class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
148 class Result = connectable_observable<SourceValue, Multicast>
149 >
memberrxcpp::member_overload150 static Result member(Observable&& o, Coordination&& cn, composite_subscription cs = composite_subscription()) {
151 return Result(Multicast(std::forward<Observable>(o), Subject(std::forward<Coordination>(cn), cs)));
152 }
153
154 template<class... AN>
memberrxcpp::member_overload155 static operators::detail::publish_invalid_t<AN...> member(AN...) {
156 std::terminate();
157 return {};
158 static_assert(sizeof...(AN) == 10000, "publish_synchronized takes (Coordination, optional CompositeSubscription)");
159 }
160 };
161
162 }
163
164 #endif
165