1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-window.hpp
6
7 \brief Return an observable that emits connected, non-overlapping windows, each containing at most count items from the source observable.
8 If the skip parameter is set, return an observable that emits windows every skip items containing at most count items from the source observable.
9
10 \param count the maximum size of each window before it should be completed
11 \param skip how many items need to be skipped before starting a new window
12
13 \return Observable that emits connected, non-overlapping windows, each containing at most count items from the source observable.
14 If the skip parameter is set, return an Observable that emits windows every skip items containing at most count items from the source observable.
15
16 \sample
17 \snippet window.cpp window count+skip sample
18 \snippet output.txt window count+skip sample
19
20 \sample
21 \snippet window.cpp window count sample
22 \snippet output.txt window count sample
23 */
24
25 #if !defined(RXCPP_OPERATORS_RX_WINDOW_HPP)
26 #define RXCPP_OPERATORS_RX_WINDOW_HPP
27
28 #include "../rx-includes.hpp"
29
30 namespace rxcpp {
31
32 namespace operators {
33
34 namespace detail {
35
36 template<class... AN>
37 struct window_invalid_arguments {};
38
39 template<class... AN>
40 struct window_invalid : public rxo::operator_base<window_invalid_arguments<AN...>> {
41 using type = observable<window_invalid_arguments<AN...>, window_invalid<AN...>>;
42 };
43 template<class... AN>
44 using window_invalid_t = typename window_invalid<AN...>::type;
45
46 template<class T>
47 struct window
48 {
49 typedef rxu::decay_t<T> source_value_type;
50 typedef observable<source_value_type> value_type;
51
52 struct window_values
53 {
window_valuesrxcpp::operators::detail::window::window_values54 window_values(int c, int s)
55 : count(c)
56 , skip(s)
57 {
58 }
59 int count;
60 int skip;
61 };
62
63 window_values initial;
64
windowrxcpp::operators::detail::window65 window(int count, int skip)
66 : initial(count, skip)
67 {
68 }
69
70 template<class Subscriber>
71 struct window_observer : public window_values
72 {
73 typedef window_observer<Subscriber> this_type;
74 typedef rxu::decay_t<T> value_type;
75 typedef rxu::decay_t<Subscriber> dest_type;
76 typedef observer<T, this_type> observer_type;
77 dest_type dest;
78 mutable int cursor;
79 mutable std::deque<rxcpp::subjects::subject<T>> subj;
80
window_observerrxcpp::operators::detail::window::window_observer81 window_observer(dest_type d, window_values v)
82 : window_values(v)
83 , dest(std::move(d))
84 , cursor(0)
85 {
86 subj.push_back(rxcpp::subjects::subject<T>());
87 dest.on_next(subj[0].get_observable().as_dynamic());
88 }
on_nextrxcpp::operators::detail::window::window_observer89 void on_next(T v) const {
90 for (auto s : subj) {
91 s.get_subscriber().on_next(v);
92 }
93
94 int c = cursor - this->count + 1;
95 if (c >= 0 && c % this->skip == 0) {
96 subj[0].get_subscriber().on_completed();
97 subj.pop_front();
98 }
99
100 if (++cursor % this->skip == 0) {
101 subj.push_back(rxcpp::subjects::subject<T>());
102 dest.on_next(subj[subj.size() - 1].get_observable().as_dynamic());
103 }
104 }
105
on_errorrxcpp::operators::detail::window::window_observer106 void on_error(rxu::error_ptr e) const {
107 for (auto s : subj) {
108 s.get_subscriber().on_error(e);
109 }
110 dest.on_error(e);
111 }
112
on_completedrxcpp::operators::detail::window::window_observer113 void on_completed() const {
114 for (auto s : subj) {
115 s.get_subscriber().on_completed();
116 }
117 dest.on_completed();
118 }
119
makerxcpp::operators::detail::window::window_observer120 static subscriber<T, observer_type> make(dest_type d, window_values v) {
121 auto cs = d.get_subscription();
122 return make_subscriber<T>(std::move(cs), observer_type(this_type(std::move(d), std::move(v))));
123 }
124 };
125
126 template<class Subscriber>
operator ()rxcpp::operators::detail::window127 auto operator()(Subscriber dest) const
128 -> decltype(window_observer<Subscriber>::make(std::move(dest), initial)) {
129 return window_observer<Subscriber>::make(std::move(dest), initial);
130 }
131 };
132
133 }
134
135 /*! @copydoc rx-window.hpp
136 */
137 template<class... AN>
window(AN &&...an)138 auto window(AN&&... an)
139 -> operator_factory<window_tag, AN...> {
140 return operator_factory<window_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
141 }
142
143 }
144
145 template<>
146 struct member_overload<window_tag>
147 {
148 template<class Observable,
149 class Enabled = rxu::enable_if_all_true_type_t<
150 is_observable<Observable>>,
151 class SourceValue = rxu::value_type_t<Observable>,
152 class Window = rxo::detail::window<SourceValue>,
153 class Value = rxu::value_type_t<Window>>
memberrxcpp::member_overload154 static auto member(Observable&& o, int count, int skip)
155 -> decltype(o.template lift<Value>(Window(count, skip))) {
156 return o.template lift<Value>(Window(count, skip));
157 }
158
159 template<class Observable,
160 class Enabled = rxu::enable_if_all_true_type_t<
161 is_observable<Observable>>,
162 class SourceValue = rxu::value_type_t<Observable>,
163 class Window = rxo::detail::window<SourceValue>,
164 class Value = rxu::value_type_t<Window>>
memberrxcpp::member_overload165 static auto member(Observable&& o, int count)
166 -> decltype(o.template lift<Value>(Window(count, count))) {
167 return o.template lift<Value>(Window(count, count));
168 }
169
170 template<class... AN>
memberrxcpp::member_overload171 static operators::detail::window_invalid_t<AN...> member(AN...) {
172 std::terminate();
173 return {};
174 static_assert(sizeof...(AN) == 10000, "window takes (Count, optional Skip)");
175 }
176 };
177
178 }
179
180 #endif
181