1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-buffer_time_count.hpp
6
7 \brief Return an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first), on the specified scheduler.
8
9 \tparam Duration the type of the time interval.
10 \tparam Coordination the type of the scheduler (optional).
11
12 \param period the period of time each buffer collects items before it is emitted and replaced with a new buffer.
13 \param count the maximum size of each buffer before it is emitted and new buffer is created.
14 \param coordination the scheduler for the buffers (optional).
15
16 \return Observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first).
17
18 \sample
19 \snippet buffer.cpp buffer period+count+coordination sample
20 \snippet output.txt buffer period+count+coordination sample
21
22 \sample
23 \snippet buffer.cpp buffer period+count sample
24 \snippet output.txt buffer period+count sample
25 */
26
27 #if !defined(RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_OR_COUNT_HPP)
28 #define RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_OR_COUNT_HPP
29
30 #include "../rx-includes.hpp"
31
32 namespace rxcpp {
33
34 namespace operators {
35
36 namespace detail {
37
38 template<class... AN>
39 struct buffer_with_time_or_count_invalid_arguments {};
40
41 template<class... AN>
42 struct buffer_with_time_or_count_invalid : public rxo::operator_base<buffer_with_time_or_count_invalid_arguments<AN...>> {
43 using type = observable<buffer_with_time_or_count_invalid_arguments<AN...>, buffer_with_time_or_count_invalid<AN...>>;
44 };
45 template<class... AN>
46 using buffer_with_time_or_count_invalid_t = typename buffer_with_time_or_count_invalid<AN...>::type;
47
48 template<class T, class Duration, class Coordination>
49 struct buffer_with_time_or_count
50 {
51 typedef rxu::decay_t<T> source_value_type;
52 typedef std::vector<source_value_type> value_type;
53 typedef rxu::decay_t<Coordination> coordination_type;
54 typedef typename coordination_type::coordinator_type coordinator_type;
55 typedef rxu::decay_t<Duration> duration_type;
56
57 struct buffer_with_time_or_count_values
58 {
buffer_with_time_or_count_valuesrxcpp::operators::detail::buffer_with_time_or_count::buffer_with_time_or_count_values59 buffer_with_time_or_count_values(duration_type p, int n, coordination_type c)
60 : period(p)
61 , count(n)
62 , coordination(c)
63 {
64 }
65 duration_type period;
66 int count;
67 coordination_type coordination;
68 };
69 buffer_with_time_or_count_values initial;
70
buffer_with_time_or_countrxcpp::operators::detail::buffer_with_time_or_count71 buffer_with_time_or_count(duration_type period, int count, coordination_type coordination)
72 : initial(period, count, coordination)
73 {
74 }
75
76 template<class Subscriber>
77 struct buffer_with_time_or_count_observer
78 {
79 typedef buffer_with_time_or_count_observer<Subscriber> this_type;
80 typedef std::vector<T> value_type;
81 typedef rxu::decay_t<Subscriber> dest_type;
82 typedef observer<value_type, this_type> observer_type;
83
84 struct buffer_with_time_or_count_subscriber_values : public buffer_with_time_or_count_values
85 {
buffer_with_time_or_count_subscriber_valuesrxcpp::operators::detail::buffer_with_time_or_count::buffer_with_time_or_count_observer::buffer_with_time_or_count_subscriber_values86 buffer_with_time_or_count_subscriber_values(composite_subscription cs, dest_type d, buffer_with_time_or_count_values v, coordinator_type c)
87 : buffer_with_time_or_count_values(std::move(v))
88 , cs(std::move(cs))
89 , dest(std::move(d))
90 , coordinator(std::move(c))
91 , worker(coordinator.get_worker())
92 , chunk_id(0)
93 {
94 }
95 composite_subscription cs;
96 dest_type dest;
97 coordinator_type coordinator;
98 rxsc::worker worker;
99 mutable int chunk_id;
100 mutable value_type chunk;
101 };
102 typedef std::shared_ptr<buffer_with_time_or_count_subscriber_values> state_type;
103 state_type state;
104
buffer_with_time_or_count_observerrxcpp::operators::detail::buffer_with_time_or_count::buffer_with_time_or_count_observer105 buffer_with_time_or_count_observer(composite_subscription cs, dest_type d, buffer_with_time_or_count_values v, coordinator_type c)
106 : state(std::make_shared<buffer_with_time_or_count_subscriber_values>(buffer_with_time_or_count_subscriber_values(std::move(cs), std::move(d), std::move(v), std::move(c))))
107 {
108 auto new_id = state->chunk_id;
109 auto produce_time = state->worker.now() + state->period;
110 auto localState = state;
111
112 auto disposer = [=](const rxsc::schedulable&){
113 localState->cs.unsubscribe();
114 localState->dest.unsubscribe();
115 localState->worker.unsubscribe();
116 };
117 auto selectedDisposer = on_exception(
118 [&](){return localState->coordinator.act(disposer);},
119 localState->dest);
120 if (selectedDisposer.empty()) {
121 return;
122 }
123
124 localState->dest.add([=](){
125 localState->worker.schedule(selectedDisposer.get());
126 });
127 localState->cs.add([=](){
128 localState->worker.schedule(selectedDisposer.get());
129 });
130
131 //
132 // The scheduler is FIFO for any time T. Since the observer is scheduling
133 // on_next/on_error/oncompleted the timed schedule calls must be resheduled
134 // when they occur to ensure that production happens after on_next/on_error/oncompleted
135 //
136
137 localState->worker.schedule(produce_time, [new_id, produce_time, localState](const rxsc::schedulable&){
138 localState->worker.schedule(produce_buffer(new_id, produce_time, localState));
139 });
140 }
141
produce_bufferrxcpp::operators::detail::buffer_with_time_or_count::buffer_with_time_or_count_observer142 static std::function<void(const rxsc::schedulable&)> produce_buffer(int id, rxsc::scheduler::clock_type::time_point expected, state_type state) {
143 auto produce = [id, expected, state](const rxsc::schedulable&) {
144 if (id != state->chunk_id)
145 return;
146
147 state->dest.on_next(state->chunk);
148 state->chunk.resize(0);
149 auto new_id = ++state->chunk_id;
150 auto produce_time = expected + state->period;
151 state->worker.schedule(produce_time, [new_id, produce_time, state](const rxsc::schedulable&){
152 state->worker.schedule(produce_buffer(new_id, produce_time, state));
153 });
154 };
155
156 auto selectedProduce = on_exception(
157 [&](){return state->coordinator.act(produce);},
158 state->dest);
159 if (selectedProduce.empty()) {
160 return std::function<void(const rxsc::schedulable&)>();
161 }
162
163 return std::function<void(const rxsc::schedulable&)>(selectedProduce.get());
164 }
165
on_nextrxcpp::operators::detail::buffer_with_time_or_count::buffer_with_time_or_count_observer166 void on_next(T v) const {
167 auto localState = state;
168 auto work = [v, localState](const rxsc::schedulable& self){
169 localState->chunk.push_back(v);
170 if (int(localState->chunk.size()) == localState->count) {
171 produce_buffer(localState->chunk_id, localState->worker.now(), localState)(self);
172 }
173 };
174 auto selectedWork = on_exception(
175 [&](){return localState->coordinator.act(work);},
176 localState->dest);
177 if (selectedWork.empty()) {
178 return;
179 }
180 localState->worker.schedule(selectedWork.get());
181 }
on_errorrxcpp::operators::detail::buffer_with_time_or_count::buffer_with_time_or_count_observer182 void on_error(rxu::error_ptr e) const {
183 auto localState = state;
184 auto work = [e, localState](const rxsc::schedulable&){
185 localState->dest.on_error(e);
186 };
187 auto selectedWork = on_exception(
188 [&](){return localState->coordinator.act(work);},
189 localState->dest);
190 if (selectedWork.empty()) {
191 return;
192 }
193 localState->worker.schedule(selectedWork.get());
194 }
on_completedrxcpp::operators::detail::buffer_with_time_or_count::buffer_with_time_or_count_observer195 void on_completed() const {
196 auto localState = state;
197 auto work = [localState](const rxsc::schedulable&){
198 localState->dest.on_next(localState->chunk);
199 localState->dest.on_completed();
200 };
201 auto selectedWork = on_exception(
202 [&](){return localState->coordinator.act(work);},
203 localState->dest);
204 if (selectedWork.empty()) {
205 return;
206 }
207 localState->worker.schedule(selectedWork.get());
208 }
209
makerxcpp::operators::detail::buffer_with_time_or_count::buffer_with_time_or_count_observer210 static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_with_time_or_count_values v) {
211 auto cs = composite_subscription();
212 auto coordinator = v.coordination.create_coordinator();
213
214 return make_subscriber<T>(cs, this_type(cs, std::move(d), std::move(v), std::move(coordinator)));
215 }
216 };
217
218 template<class Subscriber>
operator ()rxcpp::operators::detail::buffer_with_time_or_count219 auto operator()(Subscriber dest) const
220 -> decltype(buffer_with_time_or_count_observer<Subscriber>::make(std::move(dest), initial)) {
221 return buffer_with_time_or_count_observer<Subscriber>::make(std::move(dest), initial);
222 }
223 };
224
225 }
226
227 /*! @copydoc rx-buffer_time_count.hpp
228 */
229 template<class... AN>
buffer_with_time_or_count(AN &&...an)230 auto buffer_with_time_or_count(AN&&... an)
231 -> operator_factory<buffer_with_time_or_count_tag, AN...> {
232 return operator_factory<buffer_with_time_or_count_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
233 }
234
235 }
236
237 template<>
238 struct member_overload<buffer_with_time_or_count_tag>
239 {
240 template<class Observable, class Duration,
241 class Enabled = rxu::enable_if_all_true_type_t<
242 is_observable<Observable>,
243 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
244 class SourceValue = rxu::value_type_t<Observable>,
245 class BufferTimeCount = rxo::detail::buffer_with_time_or_count<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
246 class Value = rxu::value_type_t<BufferTimeCount>>
memberrxcpp::member_overload247 static auto member(Observable&& o, Duration&& period, int count)
248 -> decltype(o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, identity_current_thread()))) {
249 return o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, identity_current_thread()));
250 }
251
252 template<class Observable, class Duration, class Coordination,
253 class Enabled = rxu::enable_if_all_true_type_t<
254 is_observable<Observable>,
255 std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
256 is_coordination<Coordination>>,
257 class SourceValue = rxu::value_type_t<Observable>,
258 class BufferTimeCount = rxo::detail::buffer_with_time_or_count<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
259 class Value = rxu::value_type_t<BufferTimeCount>>
memberrxcpp::member_overload260 static auto member(Observable&& o, Duration&& period, int count, Coordination&& cn)
261 -> decltype(o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, std::forward<Coordination>(cn)))) {
262 return o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, std::forward<Coordination>(cn)));
263 }
264
265 template<class... AN>
memberrxcpp::member_overload266 static operators::detail::buffer_with_time_or_count_invalid_t<AN...> member(AN...) {
267 std::terminate();
268 return {};
269 static_assert(sizeof...(AN) == 10000, "buffer_with_time_or_count takes (Duration, Count, optional Coordination)");
270 }
271 };
272
273 }
274
275 #endif
276