1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-window_time.hpp
6
7 \brief Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
8 If the skip parameter is set, return an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
9
10 \tparam Duration the type of time intervals.
11 \tparam Coordination the type of the scheduler (optional).
12
13 \param period the period of time each window collects items before it is completed.
14 \param skip the period of time after which a new window will be created.
15 \param coordination the scheduler for the windows (optional).
16
17 \return Observable that emits observables every period time interval and collect items from this observable for period of time into each produced observable.
18 If the skip parameter is set, return an Observable that emits observables every skip time interval and collect items from this observable for period of time into each produced observable.
19
20 \sample
21 \snippet window.cpp window period+skip+coordination sample
22 \snippet output.txt window period+skip+coordination sample
23
24 \sample
25 \snippet window.cpp window period+skip sample
26 \snippet output.txt window period+skip sample
27
28 \sample
29 \snippet window.cpp window period+coordination sample
30 \snippet output.txt window period+coordination sample
31
32 \sample
33 \snippet window.cpp window period sample
34 \snippet output.txt window period sample
35 */
36
37 #if !defined(RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_HPP)
38 #define RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_HPP
39
40 #include "../rx-includes.hpp"
41
42 namespace rxcpp {
43
44 namespace operators {
45
46 namespace detail {
47
48 template<class... AN>
49 struct window_with_time_invalid_arguments {};
50
51 template<class... AN>
52 struct window_with_time_invalid : public rxo::operator_base<window_with_time_invalid_arguments<AN...>> {
53 using type = observable<window_with_time_invalid_arguments<AN...>, window_with_time_invalid<AN...>>;
54 };
55 template<class... AN>
56 using window_with_time_invalid_t = typename window_with_time_invalid<AN...>::type;
57
58 template<class T, class Duration, class Coordination>
59 struct window_with_time
60 {
61 typedef rxu::decay_t<T> source_value_type;
62 typedef observable<source_value_type> value_type;
63 typedef rxu::decay_t<Coordination> coordination_type;
64 typedef typename coordination_type::coordinator_type coordinator_type;
65 typedef rxu::decay_t<Duration> duration_type;
66
67 struct window_with_time_values
68 {
window_with_time_valuesrxcpp::operators::detail::window_with_time::window_with_time_values69 window_with_time_values(duration_type p, duration_type s, coordination_type c)
70 : period(p)
71 , skip(s)
72 , coordination(c)
73 {
74 }
75 duration_type period;
76 duration_type skip;
77 coordination_type coordination;
78 };
79 window_with_time_values initial;
80
window_with_timerxcpp::operators::detail::window_with_time81 window_with_time(duration_type period, duration_type skip, coordination_type coordination)
82 : initial(period, skip, coordination)
83 {
84 }
85
86 template<class Subscriber>
87 struct window_with_time_observer
88 {
89 typedef window_with_time_observer<Subscriber> this_type;
90 typedef rxu::decay_t<T> value_type;
91 typedef rxu::decay_t<Subscriber> dest_type;
92 typedef observer<T, this_type> observer_type;
93
94 struct window_with_time_subscriber_values : public window_with_time_values
95 {
window_with_time_subscriber_valuesrxcpp::operators::detail::window_with_time::window_with_time_observer::window_with_time_subscriber_values96 window_with_time_subscriber_values(composite_subscription cs, dest_type d, window_with_time_values v, coordinator_type c)
97 : window_with_time_values(v)
98 , cs(std::move(cs))
99 , dest(std::move(d))
100 , coordinator(std::move(c))
101 , worker(coordinator.get_worker())
102 , expected(worker.now())
103 {
104 }
105 composite_subscription cs;
106 dest_type dest;
107 coordinator_type coordinator;
108 rxsc::worker worker;
109 mutable std::deque<rxcpp::subjects::subject<T>> subj;
110 rxsc::scheduler::clock_type::time_point expected;
111 };
112 std::shared_ptr<window_with_time_subscriber_values> state;
113
window_with_time_observerrxcpp::operators::detail::window_with_time::window_with_time_observer114 window_with_time_observer(composite_subscription cs, dest_type d, window_with_time_values v, coordinator_type c)
115 : state(std::make_shared<window_with_time_subscriber_values>(window_with_time_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
116 {
117 auto localState = state;
118
119 auto disposer = [=](const rxsc::schedulable&){
120 localState->cs.unsubscribe();
121 localState->dest.unsubscribe();
122 localState->worker.unsubscribe();
123 };
124 auto selectedDisposer = on_exception(
125 [&](){return localState->coordinator.act(disposer);},
126 localState->dest);
127 if (selectedDisposer.empty()) {
128 return;
129 }
130
131 localState->dest.add([=](){
132 localState->worker.schedule(selectedDisposer.get());
133 });
134 localState->cs.add([=](){
135 localState->worker.schedule(selectedDisposer.get());
136 });
137
138 //
139 // The scheduler is FIFO for any time T. Since the observer is scheduling
140 // on_next/on_error/oncompleted the timed schedule calls must be resheduled
141 // when they occur to ensure that production happens after on_next/on_error/oncompleted
142 //
143
144 auto release_window = [localState](const rxsc::schedulable&) {
145 localState->worker.schedule([localState](const rxsc::schedulable&) {
146 localState->subj[0].get_subscriber().on_completed();
147 localState->subj.pop_front();
148 });
149 };
150 auto selectedRelease = on_exception(
151 [&](){return localState->coordinator.act(release_window);},
152 localState->dest);
153 if (selectedRelease.empty()) {
154 return;
155 }
156
157 auto create_window = [localState, selectedRelease](const rxsc::schedulable&) {
158 localState->subj.push_back(rxcpp::subjects::subject<T>());
159 localState->dest.on_next(localState->subj[localState->subj.size() - 1].get_observable().as_dynamic());
160
161 auto produce_at = localState->expected + localState->period;
162 localState->expected += localState->skip;
163 localState->worker.schedule(produce_at, [localState, selectedRelease](const rxsc::schedulable&) {
164 localState->worker.schedule(selectedRelease.get());
165 });
166 };
167 auto selectedCreate = on_exception(
168 [&](){return localState->coordinator.act(create_window);},
169 localState->dest);
170 if (selectedCreate.empty()) {
171 return;
172 }
173
174 state->worker.schedule_periodically(
175 state->expected,
176 state->skip,
177 [localState, selectedCreate](const rxsc::schedulable&) {
178 localState->worker.schedule(selectedCreate.get());
179 });
180 }
181
on_nextrxcpp::operators::detail::window_with_time::window_with_time_observer182 void on_next(T v) const {
183 auto localState = state;
184 auto work = [v, localState](const rxsc::schedulable&){
185 for (auto s : localState->subj) {
186 s.get_subscriber().on_next(v);
187 }
188 };
189 auto selectedWork = on_exception(
190 [&](){return localState->coordinator.act(work);},
191 localState->dest);
192 if (selectedWork.empty()) {
193 return;
194 }
195 localState->worker.schedule(selectedWork.get());
196 }
197
on_errorrxcpp::operators::detail::window_with_time::window_with_time_observer198 void on_error(rxu::error_ptr e) const {
199 auto localState = state;
200 auto work = [e, localState](const rxsc::schedulable&){
201 for (auto s : localState->subj) {
202 s.get_subscriber().on_error(e);
203 }
204 localState->dest.on_error(e);
205 };
206 auto selectedWork = on_exception(
207 [&](){return localState->coordinator.act(work);},
208 localState->dest);
209 if (selectedWork.empty()) {
210 return;
211 }
212 localState->worker.schedule(selectedWork.get());
213 }
214
on_completedrxcpp::operators::detail::window_with_time::window_with_time_observer215 void on_completed() const {
216 auto localState = state;
217 auto work = [localState](const rxsc::schedulable&){
218 for (auto s : localState->subj) {
219 s.get_subscriber().on_completed();
220 }
221 localState->dest.on_completed();
222 };
223 auto selectedWork = on_exception(
224 [&](){return localState->coordinator.act(work);},
225 localState->dest);
226 if (selectedWork.empty()) {
227 return;
228 }
229 localState->worker.schedule(selectedWork.get());
230 }
231
makerxcpp::operators::detail::window_with_time::window_with_time_observer232 static subscriber<T, observer_type> make(dest_type d, window_with_time_values v) {
233 auto cs = composite_subscription();
234 auto coordinator = v.coordination.create_coordinator();
235
236 return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
237 }
238 };
239
240 template<class Subscriber>
operator ()rxcpp::operators::detail::window_with_time241 auto operator()(Subscriber dest) const
242 -> decltype(window_with_time_observer<Subscriber>::make(std::move(dest), initial)) {
243 return window_with_time_observer<Subscriber>::make(std::move(dest), initial);
244 }
245 };
246
247 }
248
249 /*! @copydoc rx-window_time.hpp
250 */
251 template<class... AN>
window_with_time(AN &&...an)252 auto window_with_time(AN&&... an)
253 -> operator_factory<window_with_time_tag, AN...> {
254 return operator_factory<window_with_time_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
255 }
256
257 }
258
259 template<>
260 struct member_overload<window_with_time_tag>
261 {
262 template<class Observable, class Duration,
263 class Enabled = rxu::enable_if_all_true_type_t<
264 is_observable<Observable>,
265 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
266 class SourceValue = rxu::value_type_t<Observable>,
267 class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
268 class Value = rxu::value_type_t<WindowWithTime>>
memberrxcpp::member_overload269 static auto member(Observable&& o, Duration period)
270 -> decltype(o.template lift<Value>(WindowWithTime(period, period, identity_current_thread()))) {
271 return o.template lift<Value>(WindowWithTime(period, period, identity_current_thread()));
272 }
273
274 template<class Observable, class Duration, class Coordination,
275 class Enabled = rxu::enable_if_all_true_type_t<
276 is_observable<Observable>,
277 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
278 is_coordination<Coordination>>,
279 class SourceValue = rxu::value_type_t<Observable>,
280 class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
281 class Value = rxu::value_type_t<WindowWithTime>>
memberrxcpp::member_overload282 static auto member(Observable&& o, Duration period, Coordination&& cn)
283 -> decltype(o.template lift<Value>(WindowWithTime(period, period, std::forward<Coordination>(cn)))) {
284 return o.template lift<Value>(WindowWithTime(period, period, std::forward<Coordination>(cn)));
285 }
286
287 template<class Observable, class Duration,
288 class Enabled = rxu::enable_if_all_true_type_t<
289 is_observable<Observable>,
290 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
291 class SourceValue = rxu::value_type_t<Observable>,
292 class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
293 class Value = rxu::value_type_t<WindowWithTime>>
memberrxcpp::member_overload294 static auto member(Observable&& o, Duration&& period, Duration&& skip)
295 -> decltype(o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), identity_current_thread()))) {
296 return o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), identity_current_thread()));
297 }
298
299 template<class Observable, class Duration, class Coordination,
300 class Enabled = rxu::enable_if_all_true_type_t<
301 is_observable<Observable>,
302 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
303 is_coordination<Coordination>>,
304 class SourceValue = rxu::value_type_t<Observable>,
305 class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
306 class Value = rxu::value_type_t<WindowWithTime>>
memberrxcpp::member_overload307 static auto member(Observable&& o, Duration&& period, Duration&& skip, Coordination&& cn)
308 -> decltype(o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), std::forward<Coordination>(cn)))) {
309 return o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), std::forward<Coordination>(cn)));
310 }
311
312 template<class... AN>
memberrxcpp::member_overload313 static operators::detail::window_with_time_invalid_t<AN...> member(AN...) {
314 std::terminate();
315 return {};
316 static_assert(sizeof...(AN) == 10000, "window_with_time takes (Duration, optional Duration, optional Coordination)");
317 }
318 };
319
320 }
321
322 #endif
323