1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-buffer_time.hpp
6
7 \brief Return an observable that emits buffers every period time interval and collects items from this observable for period of time into each produced buffer.
8 If the skip parameter is set, Return an observable that emits buffers every skip time interval and collects items from this observable for period of time into each produced buffer, on the specified scheduler.
9
10 \tparam Duration the type of the time interval
11 \tparam Coordination the type of the scheduler (optional).
12
13 \param period the period of time each buffer collects items before it is emitted.
14 \param skip the period of time after which a new buffer will be created (optional).
15 \param coordination the scheduler for the buffers (optional).
16
17 \return Observable that emits buffers every period time interval and collect items from this observable for period of time into each produced buffer.
18 If the skip parameter is set, return an Observable that emits buffers every skip time interval and collect items from this observable for period of time into each produced buffer.
19
20 \sample
21 \snippet buffer.cpp buffer period+skip+coordination sample
22 \snippet output.txt buffer period+skip+coordination sample
23
24 \sample
25 \snippet buffer.cpp buffer period+skip sample
26 \snippet output.txt buffer period+skip sample
27
28 Overlapping buffers are allowed:
29 \snippet buffer.cpp buffer period+skip overlapping sample
30 \snippet output.txt buffer period+skip overlapping sample
31
32 If no items are emitted, an empty buffer is returned:
33 \snippet buffer.cpp buffer period+skip empty sample
34 \snippet output.txt buffer period+skip empty sample
35
36 \sample
37 \snippet buffer.cpp buffer period+coordination sample
38 \snippet output.txt buffer period+coordination sample
39
40 \sample
41 \snippet buffer.cpp buffer period sample
42 \snippet output.txt buffer period sample
43 */
44
45 #if !defined(RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_HPP)
46 #define RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_HPP
47
48 #include "../rx-includes.hpp"
49
50 namespace rxcpp {
51
52 namespace operators {
53
54 namespace detail {
55
56 template<class... AN>
57 struct buffer_with_time_invalid_arguments {};
58
59 template<class... AN>
60 struct buffer_with_time_invalid : public rxo::operator_base<buffer_with_time_invalid_arguments<AN...>> {
61 using type = observable<buffer_with_time_invalid_arguments<AN...>, buffer_with_time_invalid<AN...>>;
62 };
63 template<class... AN>
64 using buffer_with_time_invalid_t = typename buffer_with_time_invalid<AN...>::type;
65
66 template<class T, class Duration, class Coordination>
67 struct buffer_with_time
68 {
69 typedef rxu::decay_t<T> source_value_type;
70 typedef std::vector<source_value_type> value_type;
71 typedef rxu::decay_t<Coordination> coordination_type;
72 typedef typename coordination_type::coordinator_type coordinator_type;
73 typedef rxu::decay_t<Duration> duration_type;
74
75 struct buffer_with_time_values
76 {
buffer_with_time_valuesrxcpp::operators::detail::buffer_with_time::buffer_with_time_values77 buffer_with_time_values(duration_type p, duration_type s, coordination_type c)
78 : period(p)
79 , skip(s)
80 , coordination(c)
81 {
82 }
83 duration_type period;
84 duration_type skip;
85 coordination_type coordination;
86 };
87 buffer_with_time_values initial;
88
buffer_with_timerxcpp::operators::detail::buffer_with_time89 buffer_with_time(duration_type period, duration_type skip, coordination_type coordination)
90 : initial(period, skip, coordination)
91 {
92 }
93
94 template<class Subscriber>
95 struct buffer_with_time_observer
96 {
97 typedef buffer_with_time_observer<Subscriber> this_type;
98 typedef std::vector<T> value_type;
99 typedef rxu::decay_t<Subscriber> dest_type;
100 typedef observer<value_type, this_type> observer_type;
101
102 struct buffer_with_time_subscriber_values : public buffer_with_time_values
103 {
buffer_with_time_subscriber_valuesrxcpp::operators::detail::buffer_with_time::buffer_with_time_observer::buffer_with_time_subscriber_values104 buffer_with_time_subscriber_values(composite_subscription cs, dest_type d, buffer_with_time_values v, coordinator_type c)
105 : buffer_with_time_values(v)
106 , cs(std::move(cs))
107 , dest(std::move(d))
108 , coordinator(std::move(c))
109 , worker(coordinator.get_worker())
110 , expected(worker.now())
111 {
112 }
113 composite_subscription cs;
114 dest_type dest;
115 coordinator_type coordinator;
116 rxsc::worker worker;
117 mutable std::deque<value_type> chunks;
118 rxsc::scheduler::clock_type::time_point expected;
119 };
120 std::shared_ptr<buffer_with_time_subscriber_values> state;
121
buffer_with_time_observerrxcpp::operators::detail::buffer_with_time::buffer_with_time_observer122 buffer_with_time_observer(composite_subscription cs, dest_type d, buffer_with_time_values v, coordinator_type c)
123 : state(std::make_shared<buffer_with_time_subscriber_values>(buffer_with_time_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
124 {
125 auto localState = state;
126
127 auto disposer = [=](const rxsc::schedulable&){
128 localState->cs.unsubscribe();
129 localState->dest.unsubscribe();
130 localState->worker.unsubscribe();
131 };
132 auto selectedDisposer = on_exception(
133 [&](){return localState->coordinator.act(disposer);},
134 localState->dest);
135 if (selectedDisposer.empty()) {
136 return;
137 }
138
139 localState->dest.add([=](){
140 localState->worker.schedule(selectedDisposer.get());
141 });
142 localState->cs.add([=](){
143 localState->worker.schedule(selectedDisposer.get());
144 });
145
146 //
147 // The scheduler is FIFO for any time T. Since the observer is scheduling
148 // on_next/on_error/oncompleted the timed schedule calls must be resheduled
149 // when they occur to ensure that production happens after on_next/on_error/oncompleted
150 //
151
152 auto produce_buffer = [localState](const rxsc::schedulable&) {
153 localState->dest.on_next(std::move(localState->chunks.front()));
154 localState->chunks.pop_front();
155 };
156 auto selectedProduce = on_exception(
157 [&](){return localState->coordinator.act(produce_buffer);},
158 localState->dest);
159 if (selectedProduce.empty()) {
160 return;
161 }
162
163 auto create_buffer = [localState, selectedProduce](const rxsc::schedulable&) {
164 localState->chunks.emplace_back();
165 auto produce_at = localState->expected + localState->period;
166 localState->expected += localState->skip;
167 localState->worker.schedule(produce_at, [localState, selectedProduce](const rxsc::schedulable&) {
168 localState->worker.schedule(selectedProduce.get());
169 });
170 };
171 auto selectedCreate = on_exception(
172 [&](){return localState->coordinator.act(create_buffer);},
173 localState->dest);
174 if (selectedCreate.empty()) {
175 return;
176 }
177
178 state->worker.schedule_periodically(
179 state->expected,
180 state->skip,
181 [localState, selectedCreate](const rxsc::schedulable&) {
182 localState->worker.schedule(selectedCreate.get());
183 });
184 }
on_nextrxcpp::operators::detail::buffer_with_time::buffer_with_time_observer185 void on_next(T v) const {
186 auto localState = state;
187 auto work = [v, localState](const rxsc::schedulable&){
188 for(auto& chunk : localState->chunks) {
189 chunk.push_back(v);
190 }
191 };
192 auto selectedWork = on_exception(
193 [&](){return localState->coordinator.act(work);},
194 localState->dest);
195 if (selectedWork.empty()) {
196 return;
197 }
198 localState->worker.schedule(selectedWork.get());
199 }
on_errorrxcpp::operators::detail::buffer_with_time::buffer_with_time_observer200 void on_error(rxu::error_ptr e) const {
201 auto localState = state;
202 auto work = [e, localState](const rxsc::schedulable&){
203 localState->dest.on_error(e);
204 };
205 auto selectedWork = on_exception(
206 [&](){return localState->coordinator.act(work);},
207 localState->dest);
208 if (selectedWork.empty()) {
209 return;
210 }
211 localState->worker.schedule(selectedWork.get());
212 }
on_completedrxcpp::operators::detail::buffer_with_time::buffer_with_time_observer213 void on_completed() const {
214 auto localState = state;
215 auto work = [localState](const rxsc::schedulable&){
216 on_exception(
217 [&](){
218 while (!localState->chunks.empty()) {
219 localState->dest.on_next(std::move(localState->chunks.front()));
220 localState->chunks.pop_front();
221 }
222 return true;
223 },
224 localState->dest);
225 localState->dest.on_completed();
226 };
227 auto selectedWork = on_exception(
228 [&](){return localState->coordinator.act(work);},
229 localState->dest);
230 if (selectedWork.empty()) {
231 return;
232 }
233 localState->worker.schedule(selectedWork.get());
234 }
235
makerxcpp::operators::detail::buffer_with_time::buffer_with_time_observer236 static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_with_time_values v) {
237 auto cs = composite_subscription();
238 auto coordinator = v.coordination.create_coordinator();
239
240 return make_subscriber<T>(cs, this_type(cs, std::move(d), std::move(v), std::move(coordinator)));
241 }
242 };
243
244 template<class Subscriber>
operator ()rxcpp::operators::detail::buffer_with_time245 auto operator()(Subscriber dest) const
246 -> decltype(buffer_with_time_observer<Subscriber>::make(std::move(dest), initial)) {
247 return buffer_with_time_observer<Subscriber>::make(std::move(dest), initial);
248 }
249 };
250
251 }
252
253 /*! @copydoc rx-buffer_time.hpp
254 */
255 template<class... AN>
buffer_with_time(AN &&...an)256 auto buffer_with_time(AN&&... an)
257 -> operator_factory<buffer_with_time_tag, AN...> {
258 return operator_factory<buffer_with_time_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
259 }
260
261 }
262
263 template<>
264 struct member_overload<buffer_with_time_tag>
265 {
266 template<class Observable, class Duration,
267 class Enabled = rxu::enable_if_all_true_type_t<
268 is_observable<Observable>,
269 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
270 class SourceValue = rxu::value_type_t<Observable>,
271 class BufferWithTime = rxo::detail::buffer_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
272 class Value = rxu::value_type_t<BufferWithTime>>
memberrxcpp::member_overload273 static auto member(Observable&& o, Duration period)
274 -> decltype(o.template lift<Value>(BufferWithTime(period, period, identity_current_thread()))) {
275 return o.template lift<Value>(BufferWithTime(period, period, identity_current_thread()));
276 }
277
278 template<class Observable, class Duration, class Coordination,
279 class Enabled = rxu::enable_if_all_true_type_t<
280 is_observable<Observable>,
281 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
282 is_coordination<Coordination>>,
283 class SourceValue = rxu::value_type_t<Observable>,
284 class BufferWithTime = rxo::detail::buffer_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
285 class Value = rxu::value_type_t<BufferWithTime>>
memberrxcpp::member_overload286 static auto member(Observable&& o, Duration period, Coordination&& cn)
287 -> decltype(o.template lift<Value>(BufferWithTime(period, period, std::forward<Coordination>(cn)))) {
288 return o.template lift<Value>(BufferWithTime(period, period, std::forward<Coordination>(cn)));
289 }
290
291 template<class Observable, class Duration,
292 class Enabled = rxu::enable_if_all_true_type_t<
293 is_observable<Observable>,
294 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
295 class SourceValue = rxu::value_type_t<Observable>,
296 class BufferWithTime = rxo::detail::buffer_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
297 class Value = rxu::value_type_t<BufferWithTime>>
memberrxcpp::member_overload298 static auto member(Observable&& o, Duration&& period, Duration&& skip)
299 -> decltype(o.template lift<Value>(BufferWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), identity_current_thread()))) {
300 return o.template lift<Value>(BufferWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), identity_current_thread()));
301 }
302
303 template<class Observable, class Duration, class Coordination,
304 class Enabled = rxu::enable_if_all_true_type_t<
305 is_observable<Observable>,
306 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
307 is_coordination<Coordination>>,
308 class SourceValue = rxu::value_type_t<Observable>,
309 class BufferWithTime = rxo::detail::buffer_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
310 class Value = rxu::value_type_t<BufferWithTime>>
memberrxcpp::member_overload311 static auto member(Observable&& o, Duration&& period, Duration&& skip, Coordination&& cn)
312 -> decltype(o.template lift<Value>(BufferWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), std::forward<Coordination>(cn)))) {
313 return o.template lift<Value>(BufferWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), std::forward<Coordination>(cn)));
314 }
315
316 template<class... AN>
memberrxcpp::member_overload317 static operators::detail::buffer_with_time_invalid_t<AN...> member(AN...) {
318 std::terminate();
319 return {};
320 static_assert(sizeof...(AN) == 10000, "buffer_with_time takes (Duration, optional Duration, optional Coordination)");
321 }
322 };
323
324 }
325
326 #endif
327