• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-window.hpp
6 
7     \brief Return an observable that emits connected, non-overlapping windows, each containing at most count items from the source observable.
8            If the skip parameter is set, return an observable that emits windows every skip items containing at most count items from the source observable.
9 
10     \param count  the maximum size of each window before it should be completed
11     \param skip   how many items need to be skipped before starting a new window
12 
13     \return  Observable that emits connected, non-overlapping windows, each containing at most count items from the source observable.
14              If the skip parameter is set, return an Observable that emits windows every skip items containing at most count items from the source observable.
15 
16     \sample
17     \snippet window.cpp window count+skip sample
18     \snippet output.txt window count+skip sample
19 
20     \sample
21     \snippet window.cpp window count sample
22     \snippet output.txt window count sample
23 */
24 
25 #if !defined(RXCPP_OPERATORS_RX_WINDOW_HPP)
26 #define RXCPP_OPERATORS_RX_WINDOW_HPP
27 
28 #include "../rx-includes.hpp"
29 
30 namespace rxcpp {
31 
32 namespace operators {
33 
34 namespace detail {
35 
36 template<class... AN>
37 struct window_invalid_arguments {};
38 
39 template<class... AN>
40 struct window_invalid : public rxo::operator_base<window_invalid_arguments<AN...>> {
41     using type = observable<window_invalid_arguments<AN...>, window_invalid<AN...>>;
42 };
43 template<class... AN>
44 using window_invalid_t = typename window_invalid<AN...>::type;
45 
46 template<class T>
47 struct window
48 {
49     typedef rxu::decay_t<T> source_value_type;
50     typedef observable<source_value_type> value_type;
51 
52     struct window_values
53     {
window_valuesrxcpp::operators::detail::window::window_values54         window_values(int c, int s)
55             : count(c)
56             , skip(s)
57         {
58         }
59         int count;
60         int skip;
61     };
62 
63     window_values initial;
64 
windowrxcpp::operators::detail::window65     window(int count, int skip)
66         : initial(count, skip)
67     {
68     }
69 
70     template<class Subscriber>
71     struct window_observer : public window_values
72     {
73         typedef window_observer<Subscriber> this_type;
74         typedef rxu::decay_t<T> value_type;
75         typedef rxu::decay_t<Subscriber> dest_type;
76         typedef observer<T, this_type> observer_type;
77         dest_type dest;
78         mutable int cursor;
79         mutable std::deque<rxcpp::subjects::subject<T>> subj;
80 
window_observerrxcpp::operators::detail::window::window_observer81         window_observer(dest_type d, window_values v)
82             : window_values(v)
83             , dest(std::move(d))
84             , cursor(0)
85         {
86             subj.push_back(rxcpp::subjects::subject<T>());
87             dest.on_next(subj[0].get_observable().as_dynamic());
88         }
on_nextrxcpp::operators::detail::window::window_observer89         void on_next(T v) const {
90             for (auto s : subj) {
91                 s.get_subscriber().on_next(v);
92             }
93 
94             int c = cursor - this->count + 1;
95             if (c >= 0 && c % this->skip == 0) {
96                 subj[0].get_subscriber().on_completed();
97                 subj.pop_front();
98             }
99 
100             if (++cursor % this->skip == 0) {
101                 subj.push_back(rxcpp::subjects::subject<T>());
102                 dest.on_next(subj[subj.size() - 1].get_observable().as_dynamic());
103             }
104         }
105 
on_errorrxcpp::operators::detail::window::window_observer106         void on_error(rxu::error_ptr e) const {
107             for (auto s : subj) {
108                 s.get_subscriber().on_error(e);
109             }
110             dest.on_error(e);
111         }
112 
on_completedrxcpp::operators::detail::window::window_observer113         void on_completed() const {
114             for (auto s : subj) {
115                 s.get_subscriber().on_completed();
116             }
117             dest.on_completed();
118         }
119 
makerxcpp::operators::detail::window::window_observer120         static subscriber<T, observer_type> make(dest_type d, window_values v) {
121             auto cs = d.get_subscription();
122             return make_subscriber<T>(std::move(cs), observer_type(this_type(std::move(d), std::move(v))));
123         }
124     };
125 
126     template<class Subscriber>
operator ()rxcpp::operators::detail::window127     auto operator()(Subscriber dest) const
128         -> decltype(window_observer<Subscriber>::make(std::move(dest), initial)) {
129         return      window_observer<Subscriber>::make(std::move(dest), initial);
130     }
131 };
132 
133 }
134 
135 /*! @copydoc rx-window.hpp
136 */
137 template<class... AN>
window(AN &&...an)138 auto window(AN&&... an)
139     ->      operator_factory<window_tag, AN...> {
140      return operator_factory<window_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
141 }
142 
143 }
144 
145 template<>
146 struct member_overload<window_tag>
147 {
148     template<class Observable,
149         class Enabled = rxu::enable_if_all_true_type_t<
150             is_observable<Observable>>,
151         class SourceValue = rxu::value_type_t<Observable>,
152         class Window = rxo::detail::window<SourceValue>,
153         class Value = rxu::value_type_t<Window>>
memberrxcpp::member_overload154     static auto member(Observable&& o, int count, int skip)
155         -> decltype(o.template lift<Value>(Window(count, skip))) {
156         return      o.template lift<Value>(Window(count, skip));
157     }
158 
159      template<class Observable,
160         class Enabled = rxu::enable_if_all_true_type_t<
161             is_observable<Observable>>,
162         class SourceValue = rxu::value_type_t<Observable>,
163         class Window = rxo::detail::window<SourceValue>,
164         class Value = rxu::value_type_t<Window>>
memberrxcpp::member_overload165     static auto member(Observable&& o, int count)
166         -> decltype(o.template lift<Value>(Window(count, count))) {
167         return      o.template lift<Value>(Window(count, count));
168     }
169 
170     template<class... AN>
memberrxcpp::member_overload171     static operators::detail::window_invalid_t<AN...> member(AN...) {
172         std::terminate();
173         return {};
174         static_assert(sizeof...(AN) == 10000, "window takes (Count, optional Skip)");
175     }
176 };
177 
178 }
179 
180 #endif
181