• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-window_time.hpp
6 
7     \brief Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
8            If the skip parameter is set, return an observable that emits observables every skip time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
9 
10     \tparam Duration      the type of time intervals.
11     \tparam Coordination  the type of the scheduler (optional).
12 
13     \param period        the period of time each window collects items before it is completed.
14     \param skip          the period of time after which a new window will be created.
15     \param coordination  the scheduler for the windows (optional).
16 
17     \return  Observable that emits observables every period time interval and collect items from this observable for period of time into each produced observable.
18              If the skip parameter is set, return an Observable that emits observables every skip time interval and collect items from this observable for period of time into each produced observable.
19 
20     \sample
21     \snippet window.cpp window period+skip+coordination sample
22     \snippet output.txt window period+skip+coordination sample
23 
24     \sample
25     \snippet window.cpp window period+skip sample
26     \snippet output.txt window period+skip sample
27 
28     \sample
29     \snippet window.cpp window period+coordination sample
30     \snippet output.txt window period+coordination sample
31 
32     \sample
33     \snippet window.cpp window period sample
34     \snippet output.txt window period sample
35 */
36 
37 #if !defined(RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_HPP)
38 #define RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_HPP
39 
40 #include "../rx-includes.hpp"
41 
42 namespace rxcpp {
43 
44 namespace operators {
45 
46 namespace detail {
47 
48 template<class... AN>
49 struct window_with_time_invalid_arguments {};
50 
51 template<class... AN>
52 struct window_with_time_invalid : public rxo::operator_base<window_with_time_invalid_arguments<AN...>> {
53     using type = observable<window_with_time_invalid_arguments<AN...>, window_with_time_invalid<AN...>>;
54 };
55 template<class... AN>
56 using window_with_time_invalid_t = typename window_with_time_invalid<AN...>::type;
57 
58 template<class T, class Duration, class Coordination>
59 struct window_with_time
60 {
61     typedef rxu::decay_t<T> source_value_type;
62     typedef observable<source_value_type> value_type;
63     typedef rxu::decay_t<Coordination> coordination_type;
64     typedef typename coordination_type::coordinator_type coordinator_type;
65     typedef rxu::decay_t<Duration> duration_type;
66 
67     struct window_with_time_values
68     {
window_with_time_valuesrxcpp::operators::detail::window_with_time::window_with_time_values69         window_with_time_values(duration_type p, duration_type s, coordination_type c)
70             : period(p)
71             , skip(s)
72             , coordination(c)
73         {
74         }
75         duration_type period;
76         duration_type skip;
77         coordination_type coordination;
78     };
79     window_with_time_values initial;
80 
window_with_timerxcpp::operators::detail::window_with_time81     window_with_time(duration_type period, duration_type skip, coordination_type coordination)
82         : initial(period, skip, coordination)
83     {
84     }
85 
86     template<class Subscriber>
87     struct window_with_time_observer
88     {
89         typedef window_with_time_observer<Subscriber> this_type;
90         typedef rxu::decay_t<T> value_type;
91         typedef rxu::decay_t<Subscriber> dest_type;
92         typedef observer<T, this_type> observer_type;
93 
94         struct window_with_time_subscriber_values : public window_with_time_values
95         {
window_with_time_subscriber_valuesrxcpp::operators::detail::window_with_time::window_with_time_observer::window_with_time_subscriber_values96             window_with_time_subscriber_values(composite_subscription cs, dest_type d, window_with_time_values v, coordinator_type c)
97                 : window_with_time_values(v)
98                 , cs(std::move(cs))
99                 , dest(std::move(d))
100                 , coordinator(std::move(c))
101                 , worker(coordinator.get_worker())
102                 , expected(worker.now())
103             {
104             }
105             composite_subscription cs;
106             dest_type dest;
107             coordinator_type coordinator;
108             rxsc::worker worker;
109             mutable std::deque<rxcpp::subjects::subject<T>> subj;
110             rxsc::scheduler::clock_type::time_point expected;
111         };
112         std::shared_ptr<window_with_time_subscriber_values> state;
113 
window_with_time_observerrxcpp::operators::detail::window_with_time::window_with_time_observer114         window_with_time_observer(composite_subscription cs, dest_type d, window_with_time_values v, coordinator_type c)
115             : state(std::make_shared<window_with_time_subscriber_values>(window_with_time_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
116         {
117             auto localState = state;
118 
119             auto disposer = [=](const rxsc::schedulable&){
120                 localState->cs.unsubscribe();
121                 localState->dest.unsubscribe();
122                 localState->worker.unsubscribe();
123             };
124             auto selectedDisposer = on_exception(
125                 [&](){return localState->coordinator.act(disposer);},
126                 localState->dest);
127             if (selectedDisposer.empty()) {
128                 return;
129             }
130 
131             localState->dest.add([=](){
132                 localState->worker.schedule(selectedDisposer.get());
133             });
134             localState->cs.add([=](){
135                 localState->worker.schedule(selectedDisposer.get());
136             });
137 
138             //
139             // The scheduler is FIFO for any time T. Since the observer is scheduling
140             // on_next/on_error/oncompleted the timed schedule calls must be resheduled
141             // when they occur to ensure that production happens after on_next/on_error/oncompleted
142             //
143 
144             auto release_window = [localState](const rxsc::schedulable&) {
145                 localState->worker.schedule([localState](const rxsc::schedulable&) {
146                     localState->subj[0].get_subscriber().on_completed();
147                     localState->subj.pop_front();
148                 });
149             };
150             auto selectedRelease = on_exception(
151                 [&](){return localState->coordinator.act(release_window);},
152                 localState->dest);
153             if (selectedRelease.empty()) {
154                 return;
155             }
156 
157             auto create_window = [localState, selectedRelease](const rxsc::schedulable&) {
158                 localState->subj.push_back(rxcpp::subjects::subject<T>());
159                 localState->dest.on_next(localState->subj[localState->subj.size() - 1].get_observable().as_dynamic());
160 
161                 auto produce_at = localState->expected + localState->period;
162                 localState->expected += localState->skip;
163                 localState->worker.schedule(produce_at, [localState, selectedRelease](const rxsc::schedulable&) {
164                     localState->worker.schedule(selectedRelease.get());
165                 });
166             };
167             auto selectedCreate = on_exception(
168                 [&](){return localState->coordinator.act(create_window);},
169                 localState->dest);
170             if (selectedCreate.empty()) {
171                 return;
172             }
173 
174             state->worker.schedule_periodically(
175                 state->expected,
176                 state->skip,
177                 [localState, selectedCreate](const rxsc::schedulable&) {
178                     localState->worker.schedule(selectedCreate.get());
179                 });
180         }
181 
on_nextrxcpp::operators::detail::window_with_time::window_with_time_observer182         void on_next(T v) const {
183             auto localState = state;
184             auto work = [v, localState](const rxsc::schedulable&){
185                 for (auto s : localState->subj) {
186                     s.get_subscriber().on_next(v);
187                 }
188             };
189             auto selectedWork = on_exception(
190                 [&](){return localState->coordinator.act(work);},
191                 localState->dest);
192             if (selectedWork.empty()) {
193                 return;
194             }
195             localState->worker.schedule(selectedWork.get());
196         }
197 
on_errorrxcpp::operators::detail::window_with_time::window_with_time_observer198         void on_error(rxu::error_ptr e) const {
199             auto localState = state;
200             auto work = [e, localState](const rxsc::schedulable&){
201                 for (auto s : localState->subj) {
202                     s.get_subscriber().on_error(e);
203                 }
204                 localState->dest.on_error(e);
205             };
206             auto selectedWork = on_exception(
207                 [&](){return localState->coordinator.act(work);},
208                 localState->dest);
209             if (selectedWork.empty()) {
210                 return;
211             }
212             localState->worker.schedule(selectedWork.get());
213         }
214 
on_completedrxcpp::operators::detail::window_with_time::window_with_time_observer215         void on_completed() const {
216             auto localState = state;
217             auto work = [localState](const rxsc::schedulable&){
218                 for (auto s : localState->subj) {
219                     s.get_subscriber().on_completed();
220                 }
221                 localState->dest.on_completed();
222             };
223             auto selectedWork = on_exception(
224                 [&](){return localState->coordinator.act(work);},
225                 localState->dest);
226             if (selectedWork.empty()) {
227                 return;
228             }
229             localState->worker.schedule(selectedWork.get());
230         }
231 
makerxcpp::operators::detail::window_with_time::window_with_time_observer232         static subscriber<T, observer_type> make(dest_type d, window_with_time_values v) {
233             auto cs = composite_subscription();
234             auto coordinator = v.coordination.create_coordinator();
235 
236             return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
237         }
238     };
239 
240     template<class Subscriber>
operator ()rxcpp::operators::detail::window_with_time241     auto operator()(Subscriber dest) const
242         -> decltype(window_with_time_observer<Subscriber>::make(std::move(dest), initial)) {
243         return      window_with_time_observer<Subscriber>::make(std::move(dest), initial);
244     }
245 };
246 
247 }
248 
249 /*! @copydoc rx-window_time.hpp
250 */
251 template<class... AN>
window_with_time(AN &&...an)252 auto window_with_time(AN&&... an)
253     ->      operator_factory<window_with_time_tag, AN...> {
254      return operator_factory<window_with_time_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
255 }
256 
257 }
258 
259 template<>
260 struct member_overload<window_with_time_tag>
261 {
262     template<class Observable, class Duration,
263         class Enabled = rxu::enable_if_all_true_type_t<
264             is_observable<Observable>,
265             std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
266         class SourceValue = rxu::value_type_t<Observable>,
267         class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
268         class Value = rxu::value_type_t<WindowWithTime>>
memberrxcpp::member_overload269     static auto member(Observable&& o, Duration period)
270         -> decltype(o.template lift<Value>(WindowWithTime(period, period, identity_current_thread()))) {
271         return      o.template lift<Value>(WindowWithTime(period, period, identity_current_thread()));
272     }
273 
274     template<class Observable, class Duration, class Coordination,
275         class Enabled = rxu::enable_if_all_true_type_t<
276             is_observable<Observable>,
277             std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
278             is_coordination<Coordination>>,
279         class SourceValue = rxu::value_type_t<Observable>,
280         class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
281         class Value = rxu::value_type_t<WindowWithTime>>
memberrxcpp::member_overload282     static auto member(Observable&& o, Duration period, Coordination&& cn)
283         -> decltype(o.template lift<Value>(WindowWithTime(period, period, std::forward<Coordination>(cn)))) {
284         return      o.template lift<Value>(WindowWithTime(period, period, std::forward<Coordination>(cn)));
285     }
286 
287     template<class Observable, class Duration,
288         class Enabled = rxu::enable_if_all_true_type_t<
289             is_observable<Observable>,
290             std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
291         class SourceValue = rxu::value_type_t<Observable>,
292         class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
293         class Value = rxu::value_type_t<WindowWithTime>>
memberrxcpp::member_overload294     static auto member(Observable&& o, Duration&& period, Duration&& skip)
295         -> decltype(o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), identity_current_thread()))) {
296         return      o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), identity_current_thread()));
297     }
298 
299     template<class Observable, class Duration, class Coordination,
300         class Enabled = rxu::enable_if_all_true_type_t<
301             is_observable<Observable>,
302             std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
303             is_coordination<Coordination>>,
304         class SourceValue = rxu::value_type_t<Observable>,
305         class WindowWithTime = rxo::detail::window_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
306         class Value = rxu::value_type_t<WindowWithTime>>
memberrxcpp::member_overload307     static auto member(Observable&& o, Duration&& period, Duration&& skip, Coordination&& cn)
308         -> decltype(o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), std::forward<Coordination>(cn)))) {
309         return      o.template lift<Value>(WindowWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), std::forward<Coordination>(cn)));
310     }
311 
312     template<class... AN>
memberrxcpp::member_overload313     static operators::detail::window_with_time_invalid_t<AN...> member(AN...) {
314         std::terminate();
315         return {};
316         static_assert(sizeof...(AN) == 10000, "window_with_time takes (Duration, optional Duration, optional Coordination)");
317     }
318 };
319 
320 }
321 
322 #endif
323