1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_RX_CONNECTABLE_OBSERVABLE_HPP)
6 #define RXCPP_RX_CONNECTABLE_OBSERVABLE_HPP
7
8 #include "rx-includes.hpp"
9
10 namespace rxcpp {
11
12 namespace detail {
13
14 template<class T>
15 struct has_on_connect
16 {
17 struct not_void {};
18 template<class CT>
19 static auto check(int) -> decltype((*(CT*)nullptr).on_connect(composite_subscription()));
20 template<class CT>
21 static not_void check(...);
22
23 typedef decltype(check<T>(0)) detail_result;
24 static const bool value = std::is_same<detail_result, void>::value;
25 };
26
27 }
28
29 template<class T>
30 class dynamic_connectable_observable
31 : public dynamic_observable<T>
32 {
33 struct state_type
34 : public std::enable_shared_from_this<state_type>
35 {
36 typedef std::function<void(composite_subscription)> onconnect_type;
37
38 onconnect_type on_connect;
39 };
40 std::shared_ptr<state_type> state;
41
42 template<class U>
construct(const dynamic_observable<U> & o,tag_dynamic_observable &&)43 void construct(const dynamic_observable<U>& o, tag_dynamic_observable&&) {
44 state = o.state;
45 }
46
47 template<class U>
construct(dynamic_observable<U> && o,tag_dynamic_observable &&)48 void construct(dynamic_observable<U>&& o, tag_dynamic_observable&&) {
49 state = std::move(o.state);
50 }
51
52 template<class SO>
construct(SO && source,rxs::tag_source &&)53 void construct(SO&& source, rxs::tag_source&&) {
54 auto so = std::make_shared<rxu::decay_t<SO>>(std::forward<SO>(source));
55 state->on_connect = [so](composite_subscription cs) mutable {
56 so->on_connect(std::move(cs));
57 };
58 }
59
60 public:
61
62 typedef tag_dynamic_observable dynamic_observable_tag;
63
dynamic_connectable_observable()64 dynamic_connectable_observable()
65 {
66 }
67
68 template<class SOF>
dynamic_connectable_observable(SOF sof)69 explicit dynamic_connectable_observable(SOF sof)
70 : dynamic_observable<T>(sof)
71 , state(std::make_shared<state_type>())
72 {
73 construct(std::move(sof),
74 typename std::conditional<is_dynamic_observable<SOF>::value, tag_dynamic_observable, rxs::tag_source>::type());
75 }
76
77 template<class SF, class CF>
dynamic_connectable_observable(SF && sf,CF && cf)78 dynamic_connectable_observable(SF&& sf, CF&& cf)
79 : dynamic_observable<T>(std::forward<SF>(sf))
80 , state(std::make_shared<state_type>())
81 {
82 state->on_connect = std::forward<CF>(cf);
83 }
84
85 using dynamic_observable<T>::on_subscribe;
86
on_connect(composite_subscription cs) const87 void on_connect(composite_subscription cs) const {
88 state->on_connect(std::move(cs));
89 }
90 };
91
92 template<class T, class Source>
make_dynamic_connectable_observable(Source && s)93 connectable_observable<T> make_dynamic_connectable_observable(Source&& s) {
94 return connectable_observable<T>(dynamic_connectable_observable<T>(std::forward<Source>(s)));
95 }
96
97
98 /*!
99 \brief a source of values that is shared across all subscribers and does not start until connectable_observable::connect() is called.
100
101 \ingroup group-observable
102
103 */
104 template<class T, class SourceOperator>
105 class connectable_observable
106 : public observable<T, SourceOperator>
107 {
108 typedef connectable_observable<T, SourceOperator> this_type;
109 typedef observable<T, SourceOperator> base_type;
110 typedef rxu::decay_t<SourceOperator> source_operator_type;
111
112 static_assert(detail::has_on_connect<source_operator_type>::value, "inner must have on_connect method void(composite_subscription)");
113
114 public:
115 typedef tag_connectable_observable observable_tag;
116
connectable_observable()117 connectable_observable()
118 {
119 }
120
connectable_observable(const SourceOperator & o)121 explicit connectable_observable(const SourceOperator& o)
122 : base_type(o)
123 {
124 }
connectable_observable(SourceOperator && o)125 explicit connectable_observable(SourceOperator&& o)
126 : base_type(std::move(o))
127 {
128 }
129
130 // implicit conversion between observables of the same value_type
131 template<class SO>
connectable_observable(const connectable_observable<T,SO> & o)132 connectable_observable(const connectable_observable<T, SO>& o)
133 : base_type(o)
134 {}
135 // implicit conversion between observables of the same value_type
136 template<class SO>
connectable_observable(connectable_observable<T,SO> && o)137 connectable_observable(connectable_observable<T, SO>&& o)
138 : base_type(std::move(o))
139 {}
140
141 ///
142 /// takes any function that will take this observable and produce a result value.
143 /// this is intended to allow externally defined operators, that use subscribe,
144 /// to be connected into the expression.
145 ///
146 template<class OperatorFactory>
op(OperatorFactory && of) const147 auto op(OperatorFactory&& of) const
148 -> decltype(of(*(const this_type*)nullptr)) {
149 return of(*this);
150 static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)");
151 }
152
153 ///
154 /// performs type-forgetting conversion to a new composite_observable
155 ///
as_dynamic()156 connectable_observable<T> as_dynamic() {
157 return *this;
158 }
159
connect(composite_subscription cs=composite_subscription ())160 composite_subscription connect(composite_subscription cs = composite_subscription()) {
161 base_type::source_operator.on_connect(cs);
162 return cs;
163 }
164
165 /*! @copydoc rx-ref_count.hpp
166 */
167 template<class... AN>
ref_count(AN...an) const168 auto ref_count(AN... an) const
169 /// \cond SHOW_SERVICE_MEMBERS
170 -> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
171 /// \endcond
172 {
173 return observable_member(ref_count_tag{}, *this, std::forward<AN>(an)...);
174 }
175
176 /*! @copydoc rx-connect_forever.hpp
177 */
178 template<class... AN>
connect_forever(AN...an) const179 auto connect_forever(AN... an) const
180 /// \cond SHOW_SERVICE_MEMBERS
181 -> decltype(observable_member(connect_forever_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
182 /// \endcond
183 {
184 return observable_member(connect_forever_tag{}, *this, std::forward<AN>(an)...);
185 }
186 };
187
188
189 }
190
191 //
192 // support range() >> filter() >> subscribe() syntax
193 // '>>' is spelled 'stream'
194 //
195 template<class T, class SourceOperator, class OperatorFactory>
operator >>(const rxcpp::connectable_observable<T,SourceOperator> & source,OperatorFactory && of)196 auto operator >> (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of)
197 -> decltype(source.op(std::forward<OperatorFactory>(of))) {
198 return source.op(std::forward<OperatorFactory>(of));
199 }
200
201 //
202 // support range() | filter() | subscribe() syntax
203 // '|' is spelled 'pipe'
204 //
205 template<class T, class SourceOperator, class OperatorFactory>
operator |(const rxcpp::connectable_observable<T,SourceOperator> & source,OperatorFactory && of)206 auto operator | (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of)
207 -> decltype(source.op(std::forward<OperatorFactory>(of))) {
208 return source.op(std::forward<OperatorFactory>(of));
209 }
210
211 #endif
212