1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-window_toggle.hpp
6
7 \brief Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
8
9 \tparam Openings observable<OT>
10 \tparam ClosingSelector a function of type observable<CT>(OT)
11 \tparam Coordination the type of the scheduler (optional).
12
13 \param opens each value from this observable opens a new window.
14 \param closes this function is called for each opened window and returns an observable. the first value from the returned observable will close the window.
15 \param coordination the scheduler for the windows (optional).
16
17 \return Observable that emits an observable for each opened window.
18
19 \sample
20 \snippet window.cpp window toggle+coordination sample
21 \snippet output.txt window toggle+coordination sample
22
23 \sample
24 \snippet window.cpp window toggle sample
25 \snippet output.txt window toggle sample
26 */
27
28 #if !defined(RXCPP_OPERATORS_RX_WINDOW_TOGGLE_HPP)
29 #define RXCPP_OPERATORS_RX_WINDOW_TOGGLE_HPP
30
31 #include "../rx-includes.hpp"
32
33 namespace rxcpp {
34
35 namespace operators {
36
37 namespace detail {
38
39 template<class... AN>
40 struct window_toggle_invalid_arguments {};
41
42 template<class... AN>
43 struct window_toggle_invalid : public rxo::operator_base<window_toggle_invalid_arguments<AN...>> {
44 using type = observable<window_toggle_invalid_arguments<AN...>, window_toggle_invalid<AN...>>;
45 };
46 template<class... AN>
47 using window_toggle_invalid_t = typename window_toggle_invalid<AN...>::type;
48
49 template<class T, class Openings, class ClosingSelector, class Coordination>
50 struct window_toggle
51 {
52 typedef window_toggle<T, Openings, ClosingSelector, Coordination> this_type;
53
54 using source_value_type = rxu::decay_t<T>;
55 using coordination_type = rxu::decay_t<Coordination>;
56 using coordinator_type = typename coordination_type::coordinator_type;
57 using openings_type = rxu::decay_t<Openings>;
58 using openings_value_type = typename openings_type::value_type;
59 using closing_selector_type = rxu::decay_t<ClosingSelector>;
60 using closings_type = rxu::result_of_t<closing_selector_type(openings_value_type)>;
61 using closings_value_type = typename closings_type::value_type;
62
63 struct window_toggle_values
64 {
window_toggle_valuesrxcpp::operators::detail::window_toggle::window_toggle_values65 window_toggle_values(openings_type opens, closing_selector_type closes, coordination_type c)
66 : openings(opens)
67 , closingSelector(closes)
68 , coordination(c)
69 {
70 }
71 openings_type openings;
72 mutable closing_selector_type closingSelector;
73 coordination_type coordination;
74 };
75 window_toggle_values initial;
76
window_togglerxcpp::operators::detail::window_toggle77 window_toggle(openings_type opens, closing_selector_type closes, coordination_type coordination)
78 : initial(opens, closes, coordination)
79 {
80 }
81
82 template<class Subscriber>
83 struct window_toggle_observer
84 {
85 typedef window_toggle_observer<Subscriber> this_type;
86 typedef rxu::decay_t<T> value_type;
87 typedef rxu::decay_t<Subscriber> dest_type;
88 typedef observer<T, this_type> observer_type;
89
90 struct window_toggle_subscriber_values : public window_toggle_values
91 {
window_toggle_subscriber_valuesrxcpp::operators::detail::window_toggle::window_toggle_observer::window_toggle_subscriber_values92 window_toggle_subscriber_values(composite_subscription cs, dest_type d, window_toggle_values v, coordinator_type c)
93 : window_toggle_values(v)
94 , cs(std::move(cs))
95 , dest(std::move(d))
96 , coordinator(std::move(c))
97 , worker(coordinator.get_worker())
98 {
99 }
100 composite_subscription cs;
101 dest_type dest;
102 coordinator_type coordinator;
103 rxsc::worker worker;
104 mutable std::list<rxcpp::subjects::subject<T>> subj;
105 };
106 std::shared_ptr<window_toggle_subscriber_values> state;
107
window_toggle_observerrxcpp::operators::detail::window_toggle::window_toggle_observer108 window_toggle_observer(composite_subscription cs, dest_type d, window_toggle_values v, coordinator_type c)
109 : state(std::make_shared<window_toggle_subscriber_values>(window_toggle_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
110 {
111 auto localState = state;
112
113 composite_subscription innercs;
114
115 // when the out observer is unsubscribed all the
116 // inner subscriptions are unsubscribed as well
117 auto innerscope = localState->dest.add(innercs);
118
119 innercs.add([=](){
120 localState->dest.remove(innerscope);
121 });
122
123 localState->dest.add(localState->cs);
124
125 auto source = on_exception(
126 [&](){return localState->coordinator.in(localState->openings);},
127 localState->dest);
128 if (source.empty()) {
129 return;
130 }
131
132 // this subscribe does not share the observer subscription
133 // so that when it is unsubscribed the observer can be called
134 // until the inner subscriptions have finished
135 auto sink = make_subscriber<openings_value_type>(
136 localState->dest,
137 innercs,
138 // on_next
139 [localState](const openings_value_type& ov) {
140 auto closer = localState->closingSelector(ov);
141
142 auto it = localState->subj.insert(localState->subj.end(), rxcpp::subjects::subject<T>());
143 localState->dest.on_next(it->get_observable().as_dynamic());
144
145 composite_subscription innercs;
146
147 // when the out observer is unsubscribed all the
148 // inner subscriptions are unsubscribed as well
149 auto innerscope = localState->dest.add(innercs);
150
151 innercs.add([=](){
152 localState->dest.remove(innerscope);
153 });
154
155 auto source = localState->coordinator.in(closer);
156
157 auto sit = std::make_shared<decltype(it)>(it);
158 auto close = [localState, sit]() {
159 auto it = *sit;
160 *sit = localState->subj.end();
161 if (it != localState->subj.end()) {
162 it->get_subscriber().on_completed();
163 localState->subj.erase(it);
164 }
165 };
166
167 // this subscribe does not share the observer subscription
168 // so that when it is unsubscribed the observer can be called
169 // until the inner subscriptions have finished
170 auto sink = make_subscriber<closings_value_type>(
171 localState->dest,
172 innercs,
173 // on_next
174 [close, innercs](closings_value_type) {
175 close();
176 innercs.unsubscribe();
177 },
178 // on_error
179 [localState](rxu::error_ptr e) {
180 localState->dest.on_error(e);
181 },
182 // on_completed
183 close
184 );
185 auto selectedSink = localState->coordinator.out(sink);
186 source.subscribe(std::move(selectedSink));
187 },
188 // on_error
189 [localState](rxu::error_ptr e) {
190 localState->dest.on_error(e);
191 },
192 // on_completed
193 []() {
194 }
195 );
196 auto selectedSink = on_exception(
197 [&](){return localState->coordinator.out(sink);},
198 localState->dest);
199 if (selectedSink.empty()) {
200 return;
201 }
202 source->subscribe(std::move(selectedSink.get()));
203 }
204
on_nextrxcpp::operators::detail::window_toggle::window_toggle_observer205 void on_next(T v) const {
206 auto localState = state;
207 auto work = [v, localState](const rxsc::schedulable&){
208 for (auto s : localState->subj) {
209 s.get_subscriber().on_next(v);
210 }
211 };
212 auto selectedWork = on_exception(
213 [&](){return localState->coordinator.act(work);},
214 localState->dest);
215 if (selectedWork.empty()) {
216 return;
217 }
218 localState->worker.schedule(selectedWork.get());
219 }
220
on_errorrxcpp::operators::detail::window_toggle::window_toggle_observer221 void on_error(rxu::error_ptr e) const {
222 auto localState = state;
223 auto work = [e, localState](const rxsc::schedulable&){
224 for (auto s : localState->subj) {
225 s.get_subscriber().on_error(e);
226 }
227 localState->dest.on_error(e);
228 };
229 auto selectedWork = on_exception(
230 [&](){return localState->coordinator.act(work);},
231 localState->dest);
232 if (selectedWork.empty()) {
233 return;
234 }
235 localState->worker.schedule(selectedWork.get());
236 }
237
on_completedrxcpp::operators::detail::window_toggle::window_toggle_observer238 void on_completed() const {
239 auto localState = state;
240 auto work = [localState](const rxsc::schedulable&){
241 for (auto s : localState->subj) {
242 s.get_subscriber().on_completed();
243 }
244 localState->dest.on_completed();
245 };
246 auto selectedWork = on_exception(
247 [&](){return localState->coordinator.act(work);},
248 localState->dest);
249 if (selectedWork.empty()) {
250 return;
251 }
252 localState->worker.schedule(selectedWork.get());
253 }
254
makerxcpp::operators::detail::window_toggle::window_toggle_observer255 static subscriber<T, observer_type> make(dest_type d, window_toggle_values v) {
256 auto cs = composite_subscription();
257 auto coordinator = v.coordination.create_coordinator(d.get_subscription());
258
259 return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
260 }
261 };
262
263 template<class Subscriber>
operator ()rxcpp::operators::detail::window_toggle264 auto operator()(Subscriber dest) const
265 -> decltype(window_toggle_observer<Subscriber>::make(std::move(dest), initial)) {
266 return window_toggle_observer<Subscriber>::make(std::move(dest), initial);
267 }
268 };
269
270 }
271
272 /*! @copydoc rx-window_toggle.hpp
273 */
274 template<class... AN>
window_toggle(AN &&...an)275 auto window_toggle(AN&&... an)
276 -> operator_factory<window_toggle_tag, AN...> {
277 return operator_factory<window_toggle_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
278 }
279
280 }
281
282 template<>
283 struct member_overload<window_toggle_tag>
284 {
285 template<class Observable, class Openings, class ClosingSelector,
286 class ClosingSelectorType = rxu::decay_t<ClosingSelector>,
287 class OpeningsType = rxu::decay_t<Openings>,
288 class OpeningsValueType = typename OpeningsType::value_type,
289 class Enabled = rxu::enable_if_all_true_type_t<
290 all_observables<Observable, Openings, rxu::result_of_t<ClosingSelectorType(OpeningsValueType)>>>,
291 class SourceValue = rxu::value_type_t<Observable>,
292 class WindowToggle = rxo::detail::window_toggle<SourceValue, rxu::decay_t<Openings>, rxu::decay_t<ClosingSelector>, identity_one_worker>,
293 class Value = observable<SourceValue>>
memberrxcpp::member_overload294 static auto member(Observable&& o, Openings&& openings, ClosingSelector&& closingSelector)
295 -> decltype(o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector), identity_immediate()))) {
296 return o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector), identity_immediate()));
297 }
298
299 template<class Observable, class Openings, class ClosingSelector, class Coordination,
300 class ClosingSelectorType = rxu::decay_t<ClosingSelector>,
301 class OpeningsType = rxu::decay_t<Openings>,
302 class OpeningsValueType = typename OpeningsType::value_type,
303 class Enabled = rxu::enable_if_all_true_type_t<
304 all_observables<Observable, Openings, rxu::result_of_t<ClosingSelectorType(OpeningsValueType)>>,
305 is_coordination<Coordination>>,
306 class SourceValue = rxu::value_type_t<Observable>,
307 class WindowToggle = rxo::detail::window_toggle<SourceValue, rxu::decay_t<Openings>, rxu::decay_t<ClosingSelector>, rxu::decay_t<Coordination>>,
308 class Value = observable<SourceValue>>
memberrxcpp::member_overload309 static auto member(Observable&& o, Openings&& openings, ClosingSelector&& closingSelector, Coordination&& cn)
310 -> decltype(o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector), std::forward<Coordination>(cn)))) {
311 return o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector), std::forward<Coordination>(cn)));
312 }
313
314 template<class... AN>
memberrxcpp::member_overload315 static operators::detail::window_toggle_invalid_t<AN...> member(AN...) {
316 std::terminate();
317 return {};
318 static_assert(sizeof...(AN) == 10000, "window_toggle takes (Openings, ClosingSelector, optional Coordination)");
319 }
320 };
321
322 }
323
324 #endif
325