1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-window_time_count.hpp
6
7 \brief Return an observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first), on the specified scheduler.
8
9 \tparam Duration the type of time intervals.
10 \tparam Coordination the type of the scheduler (optional).
11
12 \param period the period of time each window collects items before it is completed and replaced with a new window.
13 \param count the maximum size of each window before it is completed and new window is created.
14 \param coordination the scheduler for the windows (optional).
15
16 \return Observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first).
17
18 \sample
19 \snippet window.cpp window period+count+coordination sample
20 \snippet output.txt window period+count+coordination sample
21
22 \sample
23 \snippet window.cpp window period+count sample
24 \snippet output.txt window period+count sample
25 */
26
27 #if !defined(RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_OR_COUNT_HPP)
28 #define RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_OR_COUNT_HPP
29
30 #include "../rx-includes.hpp"
31
32 namespace rxcpp {
33
34 namespace operators {
35
36 namespace detail {
37
38 template<class... AN>
39 struct window_with_time_or_count_invalid_arguments {};
40
41 template<class... AN>
42 struct window_with_time_or_count_invalid : public rxo::operator_base<window_with_time_or_count_invalid_arguments<AN...>> {
43 using type = observable<window_with_time_or_count_invalid_arguments<AN...>, window_with_time_or_count_invalid<AN...>>;
44 };
45 template<class... AN>
46 using window_with_time_or_count_invalid_t = typename window_with_time_or_count_invalid<AN...>::type;
47
48 template<class T, class Duration, class Coordination>
49 struct window_with_time_or_count
50 {
51 typedef rxu::decay_t<T> source_value_type;
52 typedef observable<source_value_type> value_type;
53 typedef rxu::decay_t<Coordination> coordination_type;
54 typedef typename coordination_type::coordinator_type coordinator_type;
55 typedef rxu::decay_t<Duration> duration_type;
56
57 struct window_with_time_or_count_values
58 {
window_with_time_or_count_valuesrxcpp::operators::detail::window_with_time_or_count::window_with_time_or_count_values59 window_with_time_or_count_values(duration_type p, int n, coordination_type c)
60 : period(p)
61 , count(n)
62 , coordination(c)
63 {
64 }
65 duration_type period;
66 int count;
67 coordination_type coordination;
68 };
69 window_with_time_or_count_values initial;
70
window_with_time_or_countrxcpp::operators::detail::window_with_time_or_count71 window_with_time_or_count(duration_type period, int count, coordination_type coordination)
72 : initial(period, count, coordination)
73 {
74 }
75
76 template<class Subscriber>
77 struct window_with_time_or_count_observer
78 {
79 typedef window_with_time_or_count_observer<Subscriber> this_type;
80 typedef rxu::decay_t<T> value_type;
81 typedef rxu::decay_t<Subscriber> dest_type;
82 typedef observer<T, this_type> observer_type;
83
84 struct window_with_time_or_count_subscriber_values : public window_with_time_or_count_values
85 {
window_with_time_or_count_subscriber_valuesrxcpp::operators::detail::window_with_time_or_count::window_with_time_or_count_observer::window_with_time_or_count_subscriber_values86 window_with_time_or_count_subscriber_values(composite_subscription cs, dest_type d, window_with_time_or_count_values v, coordinator_type c)
87 : window_with_time_or_count_values(std::move(v))
88 , cs(std::move(cs))
89 , dest(std::move(d))
90 , coordinator(std::move(c))
91 , worker(coordinator.get_worker())
92 , cursor(0)
93 , subj_id(0)
94 {
95 }
96 composite_subscription cs;
97 dest_type dest;
98 coordinator_type coordinator;
99 rxsc::worker worker;
100 mutable int cursor;
101 mutable int subj_id;
102 mutable rxcpp::subjects::subject<T> subj;
103 };
104 typedef std::shared_ptr<window_with_time_or_count_subscriber_values> state_type;
105 state_type state;
106
window_with_time_or_count_observerrxcpp::operators::detail::window_with_time_or_count::window_with_time_or_count_observer107 window_with_time_or_count_observer(composite_subscription cs, dest_type d, window_with_time_or_count_values v, coordinator_type c)
108 : state(std::make_shared<window_with_time_or_count_subscriber_values>(window_with_time_or_count_subscriber_values(std::move(cs), std::move(d), std::move(v), std::move(c))))
109 {
110 auto new_id = state->subj_id;
111 auto produce_time = state->worker.now();
112 auto localState = state;
113
114 auto disposer = [=](const rxsc::schedulable&){
115 localState->cs.unsubscribe();
116 localState->dest.unsubscribe();
117 localState->worker.unsubscribe();
118 };
119 auto selectedDisposer = on_exception(
120 [&](){return localState->coordinator.act(disposer);},
121 localState->dest);
122 if (selectedDisposer.empty()) {
123 return;
124 }
125
126 localState->dest.add([=](){
127 localState->worker.schedule(selectedDisposer.get());
128 });
129 localState->cs.add([=](){
130 localState->worker.schedule(selectedDisposer.get());
131 });
132
133 //
134 // The scheduler is FIFO for any time T. Since the observer is scheduling
135 // on_next/on_error/oncompleted the timed schedule calls must be resheduled
136 // when they occur to ensure that production happens after on_next/on_error/oncompleted
137 //
138
139 localState->worker.schedule(produce_time, [new_id, produce_time, localState](const rxsc::schedulable&){
140 localState->worker.schedule(release_window(new_id, produce_time, localState));
141 });
142 }
143
release_windowrxcpp::operators::detail::window_with_time_or_count::window_with_time_or_count_observer144 static std::function<void(const rxsc::schedulable&)> release_window(int id, rxsc::scheduler::clock_type::time_point expected, state_type state) {
145 auto release = [id, expected, state](const rxsc::schedulable&) {
146 if (id != state->subj_id)
147 return;
148
149 state->subj.get_subscriber().on_completed();
150 state->subj = rxcpp::subjects::subject<T>();
151 state->dest.on_next(state->subj.get_observable().as_dynamic());
152 state->cursor = 0;
153 auto new_id = ++state->subj_id;
154 auto produce_time = expected + state->period;
155 state->worker.schedule(produce_time, [new_id, produce_time, state](const rxsc::schedulable&){
156 state->worker.schedule(release_window(new_id, produce_time, state));
157 });
158 };
159 auto selectedRelease = on_exception(
160 [&](){return state->coordinator.act(release);},
161 state->dest);
162 if (selectedRelease.empty()) {
163 return std::function<void(const rxsc::schedulable&)>();
164 }
165
166 return std::function<void(const rxsc::schedulable&)>(selectedRelease.get());
167 }
168
on_nextrxcpp::operators::detail::window_with_time_or_count::window_with_time_or_count_observer169 void on_next(T v) const {
170 auto localState = state;
171 auto work = [v, localState](const rxsc::schedulable& self){
172 localState->subj.get_subscriber().on_next(v);
173 if (++localState->cursor == localState->count) {
174 release_window(localState->subj_id, localState->worker.now(), localState)(self);
175 }
176 };
177 auto selectedWork = on_exception(
178 [&](){return localState->coordinator.act(work);},
179 localState->dest);
180 if (selectedWork.empty()) {
181 return;
182 }
183 localState->worker.schedule(selectedWork.get());
184 }
185
on_errorrxcpp::operators::detail::window_with_time_or_count::window_with_time_or_count_observer186 void on_error(rxu::error_ptr e) const {
187 auto localState = state;
188 auto work = [e, localState](const rxsc::schedulable&){
189 localState->subj.get_subscriber().on_error(e);
190 localState->dest.on_error(e);
191 };
192 auto selectedWork = on_exception(
193 [&](){return localState->coordinator.act(work);},
194 localState->dest);
195 if (selectedWork.empty()) {
196 return;
197 }
198 localState->worker.schedule(selectedWork.get());
199 }
200
on_completedrxcpp::operators::detail::window_with_time_or_count::window_with_time_or_count_observer201 void on_completed() const {
202 auto localState = state;
203 auto work = [localState](const rxsc::schedulable&){
204 localState->subj.get_subscriber().on_completed();
205 localState->dest.on_completed();
206 };
207 auto selectedWork = on_exception(
208 [&](){return localState->coordinator.act(work);},
209 localState->dest);
210 if (selectedWork.empty()) {
211 return;
212 }
213 localState->worker.schedule(selectedWork.get());
214 }
215
makerxcpp::operators::detail::window_with_time_or_count::window_with_time_or_count_observer216 static subscriber<T, observer_type> make(dest_type d, window_with_time_or_count_values v) {
217 auto cs = composite_subscription();
218 auto coordinator = v.coordination.create_coordinator();
219
220 return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
221 }
222 };
223
224 template<class Subscriber>
operator ()rxcpp::operators::detail::window_with_time_or_count225 auto operator()(Subscriber dest) const
226 -> decltype(window_with_time_or_count_observer<Subscriber>::make(std::move(dest), initial)) {
227 return window_with_time_or_count_observer<Subscriber>::make(std::move(dest), initial);
228 }
229 };
230
231 }
232
233 /*! @copydoc rx-window_time_count.hpp
234 */
235 template<class... AN>
window_with_time_or_count(AN &&...an)236 auto window_with_time_or_count(AN&&... an)
237 -> operator_factory<window_with_time_or_count_tag, AN...> {
238 return operator_factory<window_with_time_or_count_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
239 }
240
241 }
242
243 template<>
244 struct member_overload<window_with_time_or_count_tag>
245 {
246 template<class Observable, class Duration,
247 class Enabled = rxu::enable_if_all_true_type_t<
248 is_observable<Observable>,
249 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
250 class SourceValue = rxu::value_type_t<Observable>,
251 class WindowTimeCount = rxo::detail::window_with_time_or_count<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
252 class Value = rxu::value_type_t<WindowTimeCount>>
memberrxcpp::member_overload253 static auto member(Observable&& o, Duration&& period, int count)
254 -> decltype(o.template lift<Value>(WindowTimeCount(std::forward<Duration>(period), count, identity_current_thread()))) {
255 return o.template lift<Value>(WindowTimeCount(std::forward<Duration>(period), count, identity_current_thread()));
256 }
257
258 template<class Observable, class Duration, class Coordination,
259 class Enabled = rxu::enable_if_all_true_type_t<
260 is_observable<Observable>,
261 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
262 is_coordination<Coordination>>,
263 class SourceValue = rxu::value_type_t<Observable>,
264 class WindowTimeCount = rxo::detail::window_with_time_or_count<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
265 class Value = rxu::value_type_t<WindowTimeCount>>
memberrxcpp::member_overload266 static auto member(Observable&& o, Duration&& period, int count, Coordination&& cn)
267 -> decltype(o.template lift<Value>(WindowTimeCount(std::forward<Duration>(period), count, std::forward<Coordination>(cn)))) {
268 return o.template lift<Value>(WindowTimeCount(std::forward<Duration>(period), count, std::forward<Coordination>(cn)));
269 }
270
271 template<class... AN>
memberrxcpp::member_overload272 static operators::detail::window_with_time_or_count_invalid_t<AN...> member(AN...) {
273 std::terminate();
274 return {};
275 static_assert(sizeof...(AN) == 10000, "window_with_time_or_count takes (Duration, Count, optional Coordination)");
276 }
277 };
278
279 }
280
281 #endif
282